Merge pull request #1890 from krasserm/wip-3771-sync-snapshot-filenames-krasserm

=per #3771 Snapshot file names are only read on system startup
This commit is contained in:
Patrik Nordwall 2013-12-16 01:28:01 -08:00
commit b74103a013

View file

@ -8,7 +8,7 @@ package akka.persistence.snapshot.local
import java.io._
import java.net.{ URLDecoder, URLEncoder }
import scala.collection.immutable.SortedSet
import scala.collection.immutable
import scala.concurrent.Future
import scala.util._
@ -31,49 +31,10 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
private val snapshotDir = new File(config.getString("dir"))
private val serializationExtension = SerializationExtension(context.system)
private var snapshotMetadata = Map.empty[String, SortedSet[SnapshotMetadata]]
def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
Future(load(processorId, criteria))(streamDispatcher)
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
Future(save(metadata, snapshot))(streamDispatcher)
def saved(metadata: SnapshotMetadata) {
snapshotMetadata = snapshotMetadata + (snapshotMetadata.get(metadata.processorId) match {
case Some(mds) metadata.processorId -> (mds + metadata)
case None metadata.processorId -> SortedSet(metadata)
})
}
def delete(metadata: SnapshotMetadata): Unit = {
snapshotMetadata = snapshotMetadata.get(metadata.processorId) match {
case Some(mds) snapshotMetadata + (metadata.processorId -> (mds - metadata))
case None snapshotMetadata
}
snapshotFile(metadata).delete()
}
def delete(processorId: String, criteria: SnapshotSelectionCriteria) = {
snapshotMetadata.get(processorId) match {
case Some(mds) mds.filter(criteria.matches).foreach(delete)
case None
}
}
private def load(processorId: String, criteria: SnapshotSelectionCriteria): Option[SelectedSnapshot] = {
@scala.annotation.tailrec
def load(metadata: SortedSet[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match {
case None None
case Some(md)
Try(withInputStream(md)(deserialize)) match {
case Success(s) Some(SelectedSnapshot(md, s.data))
case Failure(e)
log.error(e, s"error loading snapshot ${md}")
load(metadata.init) // try older snapshot
}
}
private var saving = immutable.Set.empty[SnapshotMetadata] // saving in progress
def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
//
// Heuristics:
//
// Select youngest 3 snapshots that match upper bound. This may help in situations
@ -82,8 +43,39 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
// succeed.
//
// TODO: make number of loading attempts configurable
//
val metadata = snapshotMetadata(processorId, criteria).sorted.takeRight(3)
Future(load(metadata))(streamDispatcher)
}
for (md load(metadata(processorId).filter(criteria.matches).takeRight(3))) yield md
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
saving += metadata
Future(save(metadata, snapshot))(streamDispatcher)
}
def saved(metadata: SnapshotMetadata): Unit = {
saving -= metadata
}
def delete(metadata: SnapshotMetadata): Unit = {
saving -= metadata
snapshotFile(metadata).delete()
}
def delete(processorId: String, criteria: SnapshotSelectionCriteria) = {
snapshotMetadata(processorId, criteria).foreach(delete)
}
@scala.annotation.tailrec
private def load(metadata: immutable.Seq[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match {
case None None
case Some(md)
Try(withInputStream(md)(deserialize)) match {
case Success(s) Some(SelectedSnapshot(md, s.data))
case Failure(e)
log.error(e, s"error loading snapshot ${md}")
load(metadata.init) // try older snapshot
}
}
private def save(metadata: SnapshotMetadata, snapshot: Any): Unit =
@ -107,16 +99,17 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
private def snapshotFile(metadata: SnapshotMetadata): File =
new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.processorId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}")
private def metadata(processorId: String): SortedSet[SnapshotMetadata] =
snapshotMetadata.getOrElse(processorId, SortedSet.empty)
private def metadata: Seq[SnapshotMetadata] = snapshotDir.listFiles.map(_.getName).collect {
case FilenamePattern(pid, snr, tms) SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong)
}
private def snapshotMetadata(processorId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] =
snapshotDir.listFiles(new SnapshotFilenameFilter(processorId)).map(_.getName).collect {
case FilenamePattern(pid, snr, tms) SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong)
}.filter(md criteria.matches(md) && !saving.contains(md)).toVector
override def preStart() {
if (!snapshotDir.exists) snapshotDir.mkdirs()
snapshotMetadata = SortedSet.empty ++ metadata groupBy (_.processorId)
super.preStart()
}
private class SnapshotFilenameFilter(processorId: String) extends FilenameFilter {
def accept(dir: File, name: String): Boolean = name.startsWith(s"snapshot-${URLEncoder.encode(processorId, "UTF-8")}")
}
}