From 17632d83cec7f503bf293c3cc3c8d0841b50a8f6 Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Fri, 13 Dec 2013 10:59:01 +0100 Subject: [PATCH] =per #3771 Snapshot file names are only read on system startup - snapshot file names are no longer cached - they are read on every load attempt on a per-processor basis --- .../snapshot/local/LocalSnapshotStore.scala | 95 +++++++++---------- 1 file changed, 44 insertions(+), 51 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index 51bcf70cf4..c0d6321dba 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -8,7 +8,7 @@ package akka.persistence.snapshot.local import java.io._ import java.net.{ URLDecoder, URLEncoder } -import scala.collection.immutable.SortedSet +import scala.collection.immutable import scala.concurrent.Future import scala.util._ @@ -31,49 +31,10 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo private val snapshotDir = new File(config.getString("dir")) private val serializationExtension = SerializationExtension(context.system) - private var snapshotMetadata = Map.empty[String, SortedSet[SnapshotMetadata]] - - def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = - Future(load(processorId, criteria))(streamDispatcher) - - def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = - Future(save(metadata, snapshot))(streamDispatcher) - - def saved(metadata: SnapshotMetadata) { - snapshotMetadata = snapshotMetadata + (snapshotMetadata.get(metadata.processorId) match { - case Some(mds) ⇒ metadata.processorId -> (mds + metadata) - case None ⇒ metadata.processorId -> SortedSet(metadata) - }) - } - - def delete(metadata: SnapshotMetadata): Unit = { - snapshotMetadata = snapshotMetadata.get(metadata.processorId) match { - case Some(mds) ⇒ snapshotMetadata + (metadata.processorId -> (mds - metadata)) - case None ⇒ snapshotMetadata - } - snapshotFile(metadata).delete() - } - - def delete(processorId: String, criteria: SnapshotSelectionCriteria) = { - snapshotMetadata.get(processorId) match { - case Some(mds) ⇒ mds.filter(criteria.matches).foreach(delete) - case None ⇒ - } - } - - private def load(processorId: String, criteria: SnapshotSelectionCriteria): Option[SelectedSnapshot] = { - @scala.annotation.tailrec - def load(metadata: SortedSet[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match { - case None ⇒ None - case Some(md) ⇒ - Try(withInputStream(md)(deserialize)) match { - case Success(s) ⇒ Some(SelectedSnapshot(md, s.data)) - case Failure(e) ⇒ - log.error(e, s"error loading snapshot ${md}") - load(metadata.init) // try older snapshot - } - } + private var saving = immutable.Set.empty[SnapshotMetadata] // saving in progress + def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { + // // Heuristics: // // Select youngest 3 snapshots that match upper bound. This may help in situations @@ -82,8 +43,39 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo // succeed. // // TODO: make number of loading attempts configurable + // + val metadata = snapshotMetadata(processorId, criteria).sorted.takeRight(3) + Future(load(metadata))(streamDispatcher) + } - for (md ← load(metadata(processorId).filter(criteria.matches).takeRight(3))) yield md + def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { + saving += metadata + Future(save(metadata, snapshot))(streamDispatcher) + } + + def saved(metadata: SnapshotMetadata): Unit = { + saving -= metadata + } + + def delete(metadata: SnapshotMetadata): Unit = { + saving -= metadata + snapshotFile(metadata).delete() + } + + def delete(processorId: String, criteria: SnapshotSelectionCriteria) = { + snapshotMetadata(processorId, criteria).foreach(delete) + } + + @scala.annotation.tailrec + private def load(metadata: immutable.Seq[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match { + case None ⇒ None + case Some(md) ⇒ + Try(withInputStream(md)(deserialize)) match { + case Success(s) ⇒ Some(SelectedSnapshot(md, s.data)) + case Failure(e) ⇒ + log.error(e, s"error loading snapshot ${md}") + load(metadata.init) // try older snapshot + } } private def save(metadata: SnapshotMetadata, snapshot: Any): Unit = @@ -107,16 +99,17 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo private def snapshotFile(metadata: SnapshotMetadata): File = new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.processorId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}") - private def metadata(processorId: String): SortedSet[SnapshotMetadata] = - snapshotMetadata.getOrElse(processorId, SortedSet.empty) - - private def metadata: Seq[SnapshotMetadata] = snapshotDir.listFiles.map(_.getName).collect { - case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong) - } + private def snapshotMetadata(processorId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = + snapshotDir.listFiles(new SnapshotFilenameFilter(processorId)).map(_.getName).collect { + case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong) + }.filter(md ⇒ criteria.matches(md) && !saving.contains(md)).toVector override def preStart() { if (!snapshotDir.exists) snapshotDir.mkdirs() - snapshotMetadata = SortedSet.empty ++ metadata groupBy (_.processorId) super.preStart() } + + private class SnapshotFilenameFilter(processorId: String) extends FilenameFilter { + def accept(dir: File, name: String): Boolean = name.startsWith(s"snapshot-${URLEncoder.encode(processorId, "UTF-8")}") + } }