!per #17586 async snapshot delete and remove timestamp from deleteSnapshot()

This commit is contained in:
Konrad Malawski 2015-06-03 15:56:00 +02:00
parent 156204aa81
commit 63baaf1b2b
16 changed files with 280 additions and 92 deletions

View file

@ -8,6 +8,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import akka.actor.*;
import akka.dispatch.Futures;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.iq80.leveldb.util.FileUtils;
@ -76,11 +77,13 @@ public class PersistencePluginDocTest {
}
@Override
public void doDelete(SnapshotMetadata metadata) throws Exception {
public Future<Void> doDelete(SnapshotMetadata metadata) throws Exception {
return Futures.successful(null);
}
@Override
public void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception {
public Future<Void> doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception {
return Futures.successful(null);
}
}

View file

@ -8,7 +8,6 @@ Migration Guide Akka Persistence (experimental) 2.3.3 to 2.3.4 (and 2.4.x)
is provided for Persistence while under the *experimental* flag. The goal of this phase is to gather user feedback
before we freeze the APIs in a major release.
defer renamed to deferAsync
===========================
The ``defer`` method in ``PersistentActor`` was renamed to ``deferAsync`` as it matches the semantics
@ -204,3 +203,17 @@ To continue using LevelDB based persistence plugins it is now required for relat
to include an additional explicit dependency declaration for the LevelDB artifacts.
This change allows production akka deployments to avoid need for the LevelDB provisioning.
Please see persistence extension ``reference.conf`` for details.
SnapshotStore: Snapshots can now be deleted asynchronously (and report failures)
================================================================================
Previously the ``SnapshotStore`` plugin SPI did not allow for asynchronous deletion of snapshots,
and failures of deleting a snapshot may have been even silently ignored.
Now ``SnapshotStore``s must return a ``Future`` representing the deletion of the snapshot.
If this future completes successfully the ``PersistentActor`` which initiated the snapshotting will
be notified via an ``DeleteSnapshotSuccess`` message. If the deletion fails for some reason a ``DeleteSnapshotFailure``
will be sent to the actor instead.
For ``criteria`` based deletion of snapshots (``def deleteSnapshots(criteria: SnapshotSelectionCriteria)``) equivalent
``DeleteSnapshotsSuccess`` and ``DeleteSnapshotsFailure`` messages are sent, which contain the specified criteria,
instead of ``SnapshotMetadata`` as is the case with the single snapshot deletion messages.

View file

@ -139,8 +139,8 @@ class MySnapshotStore extends SnapshotStore {
criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ???
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ???
def saved(metadata: SnapshotMetadata): Unit = ???
def delete(metadata: SnapshotMetadata): Unit = ???
def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = ???
def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = ???
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = ???
}
object PersistenceTCKDoc {

View file

@ -12,4 +12,4 @@ class LocalSnapshotStoreSpec extends SnapshotStoreSpec(
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots"
"""))
with PluginCleanup
with PluginCleanup

View file

@ -5,6 +5,7 @@
package akka.persistence.journal.japi;
import akka.persistence.*;
import scala.concurrent.Future;
interface SyncWritePlugin {
//#sync-write-plugin-api

View file

@ -45,7 +45,7 @@ interface SnapshotStorePlugin {
* @param metadata
* snapshot metadata.
*/
void doDelete(SnapshotMetadata metadata) throws Exception;
Future<Void> doDelete(SnapshotMetadata metadata) throws Exception;
/**
* Java API, Plugin API: deletes all snapshots matching `criteria`.
@ -55,6 +55,6 @@ interface SnapshotStorePlugin {
* @param criteria
* selection criteria for deleting.
*/
void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception;
Future<Void> doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception;
//#snapshot-store-plugin-api
}

View file

@ -10,7 +10,7 @@ package akka.persistence
*
* @param persistenceId id of persistent actor from which the snapshot was taken.
* @param sequenceNr sequence number at which the snapshot was taken.
* @param timestamp time at which the snapshot was saved.
* @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
*/
@SerialVersionUID(1L) //
//#snapshot-metadata
@ -26,6 +26,24 @@ final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, times
final case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
extends SnapshotProtocol.Response
/**
* Sent to a [[PersistentActor]] after successful deletion of a snapshot.
*
* @param metadata snapshot metadata.
*/
@SerialVersionUID(1L)
final case class DeleteSnapshotSuccess(metadata: SnapshotMetadata)
extends SnapshotProtocol.Response
/**
* Sent to a [[PersistentActor]] after successful deletion of specified range of snapshots.
*
* @param criteria snapshot selection criteria.
*/
@SerialVersionUID(1L)
final case class DeleteSnapshotsSuccess(criteria: SnapshotSelectionCriteria)
extends SnapshotProtocol.Response
/**
* Sent to a [[PersistentActor]] after failed saving of a snapshot.
*
@ -36,6 +54,26 @@ final case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
final case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
extends SnapshotProtocol.Response
/**
* Sent to a [[PersistentActor]] after failed deletion of a snapshot.
*
* @param metadata snapshot metadata.
* @param cause failure cause.
*/
@SerialVersionUID(1L)
final case class DeleteSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
extends SnapshotProtocol.Response
/**
* Sent to a [[PersistentActor]] after failed deletion of a range of snapshots.
*
* @param criteria snapshot selection criteria.
* @param cause failure cause.
*/
@SerialVersionUID(1L)
final case class DeleteSnapshotsFailure(criteria: SnapshotSelectionCriteria, cause: Throwable)
extends SnapshotProtocol.Response
/**
* Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received
* before any further replayed messages.

View file

@ -25,26 +25,38 @@ trait Snapshotter extends Actor {
*/
def snapshotSequenceNr: Long
/**
* Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]]
* to the running [[PersistentActor]].
*/
def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) =
snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr)
/**
* Saves a `snapshot` of this snapshotter's state. If saving succeeds, this snapshotter will receive a
* [[SaveSnapshotSuccess]] message, otherwise a [[SaveSnapshotFailure]] message.
* Saves a `snapshot` of this snapshotter's state.
*
* The [[PersistentActor]] will be notified about the success or failure of this
* via an [[SaveSnapshotSuccess]] or [[SaveSnapshotFailure]] message.
*/
def saveSnapshot(snapshot: Any): Unit = {
snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot)
}
/**
* Deletes a snapshot identified by `sequenceNr` and `timestamp`.
* Deletes the snapshot identified by `sequenceNr`.
*
* The [[PersistentActor]] will be notified about the status of the deletion
* via an [[DeleteSnapshotSuccess]] or [[DeleteSnapshotFailure]] message.
*/
def deleteSnapshot(sequenceNr: Long, timestamp: Long): Unit = {
snapshotStore ! DeleteSnapshot(SnapshotMetadata(snapshotterId, sequenceNr, timestamp))
def deleteSnapshot(sequenceNr: Long): Unit = {
snapshotStore ! DeleteSnapshot(SnapshotMetadata(snapshotterId, sequenceNr))
}
/**
* Deletes all snapshots matching `criteria`.
*
* The [[PersistentActor]] will be notified about the status of the deletion
* via an [[DeleteSnapshotsSuccess]] or [[DeleteSnapshotsFailure]] message.
*/
def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = {
snapshotStore ! DeleteSnapshots(snapshotterId, criteria)

View file

@ -10,13 +10,16 @@ import scala.collection.JavaConverters._
import akka.persistence._
import akka.persistence.journal.{ SyncWriteJournal SSyncWriteJournal }
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
/**
* Java API: abstract journal, optimized for synchronous writes.
*/
abstract class SyncWriteJournal extends AsyncRecovery with SSyncWriteJournal with SyncWritePlugin {
final def writeMessages(messages: immutable.Seq[PersistentRepr]) =
final def writeMessages(messages: immutable.Seq[PersistentRepr]): Unit =
doWriteMessages(messages.asJava)
final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) =
final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit =
doDeleteMessagesTo(persistenceId, toSequenceNr, permanent)
}

View file

@ -14,7 +14,7 @@ import akka.persistence._
/**
* Abstract snapshot store.
*/
trait SnapshotStore extends Actor {
trait SnapshotStore extends Actor with ActorLogging {
import SnapshotProtocol._
import context.dispatcher
@ -29,6 +29,7 @@ trait SnapshotStore extends Actor {
} recover {
case e LoadSnapshotResult(None, toSequenceNr)
} pipeTo p
case SaveSnapshot(metadata, snapshot)
val p = sender()
val md = metadata.copy(timestamp = System.currentTimeMillis)
@ -37,18 +38,34 @@ trait SnapshotStore extends Actor {
} recover {
case e SaveSnapshotFailure(metadata, e)
} to (self, p)
case evt @ SaveSnapshotSuccess(metadata)
saved(metadata)
sender() ! evt // sender is persistentActor
try saved(metadata) finally sender() ! evt // sender is persistentActor
case evt @ SaveSnapshotFailure(metadata, _)
delete(metadata)
sender() ! evt // sender is persistentActor
try deleteAsync(metadata) finally sender() ! evt // sender is persistentActor
case d @ DeleteSnapshot(metadata)
delete(metadata)
if (publish) context.system.eventStream.publish(d)
val p = sender()
deleteAsync(metadata) map {
case _
log.warning("deleting by: " + d)
DeleteSnapshotSuccess(metadata)
} recover {
case e DeleteSnapshotFailure(metadata, e)
} pipeTo p onComplete {
case _ if publish context.system.eventStream.publish(d)
}
case d @ DeleteSnapshots(persistenceId, criteria)
delete(persistenceId, criteria)
if (publish) context.system.eventStream.publish(d)
val p = sender()
deleteAsync(persistenceId, criteria) map {
case _ DeleteSnapshotsSuccess(criteria)
} recover {
case e DeleteSnapshotsFailure(criteria, e)
} pipeTo p onComplete {
case _ if publish context.system.eventStream.publish(d)
}
}
//#snapshot-store-plugin-api
@ -73,7 +90,7 @@ trait SnapshotStore extends Actor {
*
* @param metadata snapshot metadata.
*/
def saved(metadata: SnapshotMetadata)
def saved(metadata: SnapshotMetadata): Unit
/**
* Plugin API: deletes the snapshot identified by `metadata`.
@ -81,7 +98,7 @@ trait SnapshotStore extends Actor {
* @param metadata snapshot metadata.
*/
def delete(metadata: SnapshotMetadata)
def deleteAsync(metadata: SnapshotMetadata): Future[Unit]
/**
* Plugin API: deletes all snapshots matching `criteria`.
@ -89,6 +106,6 @@ trait SnapshotStore extends Actor {
* @param persistenceId id of the persistent actor.
* @param criteria selection criteria for deleting.
*/
def delete(persistenceId: String, criteria: SnapshotSelectionCriteria)
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
//#snapshot-store-plugin-api
}

View file

@ -4,12 +4,12 @@
package akka.persistence.snapshot.japi
import scala.concurrent.Future
import akka.japi.{ Option JOption }
import akka.persistence._
import akka.persistence.snapshot.{ SnapshotStore SSnapshotStore }
import scala.concurrent.Future
/**
* Java API: abstract snapshot store.
*/
@ -22,13 +22,13 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin {
final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
doSaveAsync(metadata, snapshot).map(Unit.unbox)
final def saved(metadata: SnapshotMetadata) =
final def saved(metadata: SnapshotMetadata): Unit =
onSaved(metadata)
final def delete(metadata: SnapshotMetadata) =
doDelete(metadata)
final def delete(metadata: SnapshotMetadata): Future[Unit] =
doDelete(metadata).map(_ ())
final def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) =
doDelete(persistenceId: String, criteria: SnapshotSelectionCriteria)
final def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
doDelete(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ ())
}

View file

@ -8,17 +8,17 @@ package akka.persistence.snapshot.local
import java.io._
import java.net.{ URLDecoder, URLEncoder }
import akka.actor.ActorLogging
import akka.persistence._
import akka.persistence.serialization._
import akka.persistence.snapshot._
import akka.serialization.SerializationExtension
import akka.util.ByteString.UTF_8
import scala.collection.immutable
import scala.concurrent.Future
import scala.util._
import akka.actor.ActorLogging
import akka.persistence._
import akka.persistence.snapshot._
import akka.persistence.serialization._
import akka.serialization.SerializationExtension
import akka.util.ByteString.UTF_8
/**
* INTERNAL API.
*
@ -45,7 +45,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
//
// TODO: make number of loading attempts configurable
//
val metadata = snapshotMetadata(persistenceId, criteria).sorted.takeRight(3)
val metadata = snapshotMetadatas(persistenceId, criteria).sorted.takeRight(3)
Future(load(metadata))(streamDispatcher)
}
@ -58,13 +58,26 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
saving -= metadata
}
def delete(metadata: SnapshotMetadata): Unit = {
def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = {
saving -= metadata
snapshotFile(metadata).delete()
Future {
// multiple snapshot files here mean that there were multiple snapshots for this seqNr, we delete all of them
// usually snapshot-stores would keep one snapshot per sequenceNr however here in the file-based one we timestamp
// snapshots and allow multiple to be kept around (for the same seqNr) if desired
snapshotFiles(metadata).map(_.delete())
}(streamDispatcher).map(_ ())(streamDispatcher)
}
def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = {
snapshotMetadata(persistenceId, criteria).foreach(delete)
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
val metadatas = snapshotMetadatas(persistenceId, criteria)
Future.sequence {
metadatas.map(deleteAsync)
}(collection.breakOut, streamDispatcher).map(_ ())(streamDispatcher)
}
private def snapshotFiles(metadata: SnapshotMetadata): immutable.Seq[File] = {
// pick all files for this persistenceId and sequenceNr, old journals could have created multiple entries with appended timestamps
snapshotDir.listFiles(new SnapshotSeqNrFilenameFilter(metadata.persistenceId, metadata.sequenceNr)).toVector
}
@scala.annotation.tailrec
@ -81,7 +94,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
protected def save(metadata: SnapshotMetadata, snapshot: Any): Unit = {
val tmpFile = withOutputStream(metadata)(serialize(_, Snapshot(snapshot)))
tmpFile.renameTo(snapshotFile(metadata))
tmpFile.renameTo(snapshotFileForWrite(metadata))
}
protected def deserialize(inputStream: InputStream): Snapshot =
@ -91,21 +104,22 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
outputStream.write(serializationExtension.findSerializerFor(snapshot).toBinary(snapshot))
protected def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) Unit): File = {
val tmpFile = snapshotFile(metadata, extension = "tmp")
val tmpFile = snapshotFileForWrite(metadata, extension = "tmp")
withStream(new BufferedOutputStream(new FileOutputStream(tmpFile)), p)
tmpFile
}
private def withInputStream[T](metadata: SnapshotMetadata)(p: (InputStream) T): T =
withStream(new BufferedInputStream(new FileInputStream(snapshotFile(metadata))), p)
withStream(new BufferedInputStream(new FileInputStream(snapshotFileForWrite(metadata))), p)
private def withStream[A <: Closeable, B](stream: A, p: A B): B =
try { p(stream) } finally { stream.close() }
private def snapshotFile(metadata: SnapshotMetadata, extension: String = ""): File =
/** Only by persistenceId and sequenceNr, timestamp is informational - accomodates for 2.13.x series files */
private def snapshotFileForWrite(metadata: SnapshotMetadata, extension: String = ""): File =
new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, UTF_8)}-${metadata.sequenceNr}-${metadata.timestamp}${extension}")
private def snapshotMetadata(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = {
private def snapshotMetadatas(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = {
val files = snapshotDir.listFiles(new SnapshotFilenameFilter(persistenceId))
if (files eq null) Nil // if the dir was removed
else files.map(_.getName).collect {
@ -128,11 +142,24 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
dir
}
private class SnapshotFilenameFilter(persistenceId: String) extends FilenameFilter {
def accept(dir: File, name: String): Boolean =
private final class SnapshotFilenameFilter(persistenceId: String) extends FilenameFilter {
def accept(dir: File, name: String): Boolean = {
name match {
case FilenamePattern(pid, snr, tms) pid.equals(URLEncoder.encode(persistenceId))
case _ false
}
}
}
private final class SnapshotSeqNrFilenameFilter(persistenceId: String, sequenceNr: Long) extends FilenameFilter {
private final def matches(pid: String, snr: String): Boolean =
pid.equals(URLEncoder.encode(persistenceId)) && Try(snr.toLong == sequenceNr).getOrElse(false)
def accept(dir: File, name: String): Boolean =
name match {
case FilenamePattern(pid, snr, tms) matches(pid, snr)
case _ false
}
}
}

View file

@ -4,18 +4,22 @@
package akka.persistence
import akka.actor.{ Props, ActorRef }
import akka.testkit.{ TestEvent, EventFilter, ImplicitSender, AkkaSpec }
import scala.concurrent.duration._
import akka.persistence.snapshot.local.LocalSnapshotStore
import akka.persistence.serialization.Snapshot
import akka.event.Logging
import java.io.IOException
import akka.actor.{ ActorRef, Props }
import akka.event.Logging
import akka.persistence.snapshot.local.LocalSnapshotStore
import akka.testkit.{ EventFilter, ImplicitSender, TestEvent }
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
object SnapshotFailureRobustnessSpec {
case class Cmd(payload: String)
case class DeleteSnapshot(seqNr: Int)
case class DeleteSnapshots(criteria: SnapshotSelectionCriteria)
class SaveSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
override def receiveRecover: Receive = {
@ -30,6 +34,26 @@ object SnapshotFailureRobustnessSpec {
}
}
class DeleteSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
// TODO do we call it "snapshot store" or "snapshot plugin", small inconsistency here
override def snapshotPluginId: String =
"akka.persistence.snapshot-store.local-delete-fail"
override def receiveRecover: Receive = {
case SnapshotOffer(md, s) probe ! ((md, s))
case other probe ! other
}
override def receiveCommand = {
case Cmd(payload) persist(payload)(_ saveSnapshot(payload))
case DeleteSnapshot(seqNr) deleteSnapshot(seqNr)
case DeleteSnapshots(crit) deleteSnapshots(crit)
case SaveSnapshotSuccess(md) probe ! md.sequenceNr
case other probe ! other
}
}
class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
override def receiveRecover: Receive = {
case SnapshotOffer(md, s) probe ! ((md, s))
@ -56,11 +80,24 @@ object SnapshotFailureRobustnessSpec {
} else super.save(metadata, snapshot)
}
}
class DeleteFailingLocalSnapshotStore extends LocalSnapshotStore {
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = {
super.deleteAsync(metadata) // we actually delete it properly, but act as if it failed
Future.failed(new IOException("Failed to delete snapshot for some reason!"))
}
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
super.deleteAsync(persistenceId, criteria) // we actually delete it properly, but act as if it failed
Future.failed(new IOException("Failed to delete snapshot for some reason!"))
}
}
}
class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some(
"""
akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$FailingLocalSnapshotStore"
akka.persistence.snapshot-store.local-delete-fail.class = "akka.persistence.SnapshotFailureRobustnessSpec$DeleteFailingLocalSnapshotStore"
"""))) with ImplicitSender {
import SnapshotFailureRobustnessSpec._
@ -95,5 +132,37 @@ class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.conf
EventFilter.error(start = "Error loading snapshot [")))
}
}
"receive failure message when deleting a single snapshot fails" in {
val p = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))
val persistenceId = name
expectMsg(RecoveryCompleted)
p ! Cmd("hello")
expectMsg(1)
p ! DeleteSnapshot(1)
expectMsgPF() {
case DeleteSnapshotFailure(SnapshotMetadata(`persistenceId`, 1, timestamp), cause)
// ok, expected failure
cause.getMessage should include("Failed to delete")
}
}
"receive failure message when bulk deleting snapshot fails" in {
val p = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))
val persistenceId = name
expectMsg(RecoveryCompleted)
p ! Cmd("hello")
expectMsg(1)
p ! Cmd("hola")
expectMsg(2)
val criteria = SnapshotSelectionCriteria(maxSequenceNr = 10)
p ! DeleteSnapshots(criteria)
expectMsgPF() {
case DeleteSnapshotsFailure(criteria, cause)
// ok, expected failure
cause.getMessage should include("Failed to delete")
}
}
}
}

View file

@ -1,7 +1,7 @@
package akka.persistence
import akka.actor.{ Props, Actor, ActorRef }
import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec }
import akka.actor.{ ActorLogging, ActorRef, Props }
import akka.testkit.ImplicitSender
object SnapshotRecoveryLocalStoreSpec {
val persistenceId = "europe"
@ -21,13 +21,14 @@ object SnapshotRecoveryLocalStoreSpec {
}
}
class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name)
with ActorLogging {
def receiveCommand = {
case _
}
def receiveRecover = {
case SnapshotOffer(md, s) probe ! ((md, s))
case other probe ! other
case other probe ! other
}
override def preStart() = ()
}
@ -45,7 +46,6 @@ class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.con
persistentActor1 ! TakeSnapshot
persistentActor2 ! TakeSnapshot
expectMsgAllOf(0L, 0L)
}
"A persistent actor which is persisted at the same time as another actor whose persistenceId is an extension of the first " must {
@ -54,11 +54,7 @@ class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.con
val recoveringActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, testActor))
recoveringActor ! Recover()
expectMsgPF() {
case (SnapshotMetadata(pid, seqNo, timestamp), state)
pid should ===(persistenceId)
}
expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, seqNo, timestamp), state) }
expectMsg(RecoveryCompleted)
}

View file

@ -31,9 +31,9 @@ object SnapshotSpec {
class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) {
override def receiveRecover: Receive = {
case payload: String probe ! s"${payload}-${lastSequenceNr}"
case SnapshotOffer(md, s) probe ! ((md, s))
case other probe ! other
case payload: String probe ! s"${payload}-${lastSequenceNr}"
case offer @ SnapshotOffer(md, s) probe ! offer
case other probe ! other
}
override def receiveCommand = {
@ -42,8 +42,8 @@ object SnapshotSpec {
persist(payload) { _
probe ! s"${payload}-${lastSequenceNr}"
}
case SnapshotOffer(md, s) probe ! ((md, s))
case other probe ! other
case offer @ SnapshotOffer(md, s) probe ! offer
case other probe ! other
}
override def preStart() = ()
}
@ -54,7 +54,7 @@ object SnapshotSpec {
class DeleteSnapshotTestPersistentActor(name: String, probe: ActorRef) extends LoadSnapshotTestPersistentActor(name, probe) {
override def receiveCommand = receiveDelete orElse super.receiveCommand
def receiveDelete: Receive = {
case Delete1(metadata) deleteSnapshot(metadata.sequenceNr, metadata.timestamp)
case Delete1(metadata) deleteSnapshot(metadata.sequenceNr)
case DeleteN(criteria) deleteSnapshots(criteria)
}
}
@ -88,7 +88,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
persistentActor ! Recover()
expectMsgPF() {
case (SnapshotMetadata(`persistenceId`, 4, timestamp), state)
case SnapshotOffer(SnapshotMetadata(`persistenceId`, 4, timestamp), state)
state should ===(List("a-1", "b-2", "c-3", "d-4").reverse)
timestamp should be > (0L)
}
@ -103,7 +103,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
persistentActor ! Recover(toSequenceNr = 3)
expectMsgPF() {
case (SnapshotMetadata(`persistenceId`, 2, timestamp), state)
case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state)
state should ===(List("a-1", "b-2").reverse)
timestamp should be > (0L)
}
@ -118,7 +118,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
persistentActor ! "done"
expectMsgPF() {
case (SnapshotMetadata(`persistenceId`, 4, timestamp), state)
case SnapshotOffer(SnapshotMetadata(`persistenceId`, 4, timestamp), state)
state should ===(List("a-1", "b-2", "c-3", "d-4").reverse)
timestamp should be > (0L)
}
@ -132,7 +132,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2))
expectMsgPF() {
case (SnapshotMetadata(`persistenceId`, 2, timestamp), state)
case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state)
state should ===(List("a-1", "b-2").reverse)
timestamp should be > (0L)
}
@ -149,7 +149,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3)
expectMsgPF() {
case (SnapshotMetadata(`persistenceId`, 2, timestamp), state)
case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state)
state should ===(List("a-1", "b-2").reverse)
timestamp should be > (0L)
}
@ -166,7 +166,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
expectMsg("c-3")
expectMsg(RecoveryCompleted)
}
"support single message deletions" in {
"support single snapshot deletions" in {
val deleteProbe = TestProbe()
val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))
@ -178,8 +178,8 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
persistentActor1 ! Recover(toSequenceNr = 4)
persistentActor1 ! "done"
val metadata = expectMsgPF() {
case (md @ SnapshotMetadata(`persistenceId`, 4, _), state)
val metadata = expectMsgPF(hint = "" + SnapshotOffer(SnapshotMetadata(persistenceId, 4, 0), null)) {
case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 4, _), state)
state should ===(List("a-1", "b-2", "c-3", "d-4").reverse)
md
}
@ -188,13 +188,17 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
persistentActor1 ! Delete1(metadata)
deleteProbe.expectMsgType[DeleteSnapshot]
expectMsgPF(hint = "" + DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, 0))) {
case m @ DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, _))
info("success = " + m)
}
// recover persistentActor from 2nd snapshot (3rd was deleted) plus replayed messages
val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))
persistentActor2 ! Recover(toSequenceNr = 4)
expectMsgPF() {
case (md @ SnapshotMetadata(`persistenceId`, 2, _), state)
expectMsgPF(hint = "" + SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, 0), null)) {
case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 2, _), state)
state should ===(List("a-1", "b-2").reverse)
md
}
@ -202,7 +206,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
expectMsg("d-4")
expectMsg(RecoveryCompleted)
}
"support bulk message deletions" in {
"support bulk snapshot deletions" in {
val deleteProbe = TestProbe()
val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))
@ -212,13 +216,15 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
// recover persistentActor and the delete first three (= all) snapshots
persistentActor1 ! Recover(toSequenceNr = 4)
persistentActor1 ! DeleteN(SnapshotSelectionCriteria(maxSequenceNr = 4))
val criteria = SnapshotSelectionCriteria(maxSequenceNr = 4)
persistentActor1 ! DeleteN(criteria)
expectMsgPF() {
case (md @ SnapshotMetadata(`persistenceId`, 4, _), state)
case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 4, _), state)
state should ===(List("a-1", "b-2", "c-3", "d-4").reverse)
}
expectMsg(RecoveryCompleted)
deleteProbe.expectMsgType[DeleteSnapshots]
expectMsgPF() { case DeleteSnapshotsSuccess(`criteria`) }
// recover persistentActor from replayed messages (all snapshots deleted)
val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))

View file

@ -5,6 +5,7 @@
package doc;
//#plugin-imports
import akka.dispatch.Futures;
import akka.persistence.*;
import akka.persistence.journal.japi.*;
import akka.persistence.snapshot.japi.*;
@ -69,11 +70,13 @@ public class LambdaPersistencePluginDocTest {
}
@Override
public void doDelete(SnapshotMetadata metadata) throws Exception {
public Future<Void> doDelete(SnapshotMetadata metadata) throws Exception {
return Futures.successful(null);
}
@Override
public void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception {
public Future<Void> doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception {
return Futures.successful(null);
}
}