=per #18112 Use timestamp in deleteSnapshots

It was actually used for finding the right metadata, but the local store
deleted all files with matching seqNr.
Note that we use 0L as undefined value for the timestamp when deleting
single snapshot (and therefore it makes sense to delete all in that case)
This commit is contained in:
Patrik Nordwall 2015-08-14 12:42:25 +02:00
parent fd43ac7b8d
commit 0cd94e2c8c
3 changed files with 28 additions and 13 deletions

View file

@ -81,8 +81,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
}
private def snapshotFiles(metadata: SnapshotMetadata): immutable.Seq[File] = {
// pick all files for this persistenceId and sequenceNr, old journals could have created multiple entries with appended timestamps
snapshotDir.listFiles(new SnapshotSeqNrFilenameFilter(metadata.persistenceId, metadata.sequenceNr)).toVector
snapshotDir.listFiles(new SnapshotSeqNrFilenameFilter(metadata)).toVector
}
@scala.annotation.tailrec
@ -156,13 +155,15 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
}
}
private final class SnapshotSeqNrFilenameFilter(persistenceId: String, sequenceNr: Long) extends FilenameFilter {
private final def matches(pid: String, snr: String): Boolean =
pid.equals(URLEncoder.encode(persistenceId)) && Try(snr.toLong == sequenceNr).getOrElse(false)
private final class SnapshotSeqNrFilenameFilter(md: SnapshotMetadata) extends FilenameFilter {
private final def matches(pid: String, snr: String, tms: String): Boolean = {
pid.equals(URLEncoder.encode(md.persistenceId)) &&
Try(snr.toLong == md.sequenceNr && (md.timestamp == 0L || tms.toLong == md.timestamp)).getOrElse(false)
}
def accept(dir: File, name: String): Boolean =
name match {
case FilenamePattern(pid, snr, tms) matches(pid, snr)
case FilenamePattern(pid, snr, tms) matches(pid, snr, tms)
case _ false
}

View file

@ -5,7 +5,7 @@
package akka.persistence.fsm
import akka.actor._
import akka.persistence.{PersistentActor, RecoveryCompleted, PersistenceSpec}
import akka.persistence.{ PersistentActor, RecoveryCompleted, PersistenceSpec }
import akka.persistence.fsm.PersistentFSM._
import akka.testkit._
import com.typesafe.config.Config
@ -266,13 +266,13 @@ abstract class PersistentFSMSpec(config: Config) extends PersistenceSpec(config)
val persistentEventsStreamer = system.actorOf(PersistentEventsStreamer.props(persistenceId, testActor))
expectMsg(ItemAdded(Item("1", "Shirt", 59.99F)))
expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted
expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted
expectMsg(ItemAdded(Item("2", "Shoes", 89.99F)))
expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted
expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted
expectMsg(ItemAdded(Item("3", "Coat", 119.99F)))
expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted
expectMsgType[StateChangeEvent] //because a timeout is defined, State Change is persisted
expectMsg(OrderExecuted)
expectMsgType[StateChangeEvent]
@ -431,13 +431,13 @@ object PersistentFSMSpec {
def props(persistenceId: String, reportActor: ActorRef) =
Props(new WebStoreCustomerFSM(persistenceId, reportActor))
}
class PersistentEventsStreamer(id: String, client: ActorRef) extends PersistentActor {
override val persistenceId: String = id
def receiveRecover = {
case RecoveryCompleted // do nothing
case persistentEvent client ! persistentEvent
case RecoveryCompleted // do nothing
case persistentEvent client ! persistentEvent
}
def receiveCommand = {