From 63baaf1b2b3d0fc6c1971307fd1e66d74106c968 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Wed, 3 Jun 2015 15:56:00 +0200 Subject: [PATCH 1/3] !per #17586 async snapshot delete and remove timestamp from deleteSnapshot() --- .../persistence/PersistencePluginDocTest.java | 7 +- ...e-persistence-experimental-2.3.x-2.4.x.rst | 15 +++- .../PersistencePluginDocSpec.scala | 4 +- .../local/LocalSnapshotStoreSpec.scala | 2 +- .../journal/japi/SyncWritePlugin.java | 1 + .../snapshot/japi/SnapshotStorePlugin.java | 4 +- .../akka/persistence/SnapshotProtocol.scala | 40 ++++++++- .../scala/akka/persistence/Snapshotter.scala | 22 +++-- .../journal/japi/SyncWriteJournal.scala | 7 +- .../persistence/snapshot/SnapshotStore.scala | 41 +++++++--- .../snapshot/japi/SnapshotStore.scala | 14 ++-- .../snapshot/local/LocalSnapshotStore.scala | 65 ++++++++++----- .../SnapshotFailureRobustnessSpec.scala | 81 +++++++++++++++++-- .../SnapshotRecoveryLocalStoreSpec.scala | 18 ++--- .../scala/akka/persistence/SnapshotSpec.scala | 44 +++++----- .../doc/LambdaPersistencePluginDocTest.java | 7 +- 16 files changed, 280 insertions(+), 92 deletions(-) diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index a6f78eeae4..1e8da37365 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -8,6 +8,7 @@ import java.io.File; import java.util.ArrayList; import java.util.List; import akka.actor.*; +import akka.dispatch.Futures; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.iq80.leveldb.util.FileUtils; @@ -76,11 +77,13 @@ public class PersistencePluginDocTest { } @Override - public void doDelete(SnapshotMetadata metadata) throws Exception { + public Future doDelete(SnapshotMetadata metadata) throws Exception { + return Futures.successful(null); } @Override - public void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { + public Future doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { + return Futures.successful(null); } } diff --git a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst index a06a4d488a..864c47ada4 100644 --- a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst @@ -8,7 +8,6 @@ Migration Guide Akka Persistence (experimental) 2.3.3 to 2.3.4 (and 2.4.x) is provided for Persistence while under the *experimental* flag. The goal of this phase is to gather user feedback before we freeze the APIs in a major release. - defer renamed to deferAsync =========================== The ``defer`` method in ``PersistentActor`` was renamed to ``deferAsync`` as it matches the semantics @@ -204,3 +203,17 @@ To continue using LevelDB based persistence plugins it is now required for relat to include an additional explicit dependency declaration for the LevelDB artifacts. This change allows production akka deployments to avoid need for the LevelDB provisioning. Please see persistence extension ``reference.conf`` for details. + +SnapshotStore: Snapshots can now be deleted asynchronously (and report failures) +================================================================================ +Previously the ``SnapshotStore`` plugin SPI did not allow for asynchronous deletion of snapshots, +and failures of deleting a snapshot may have been even silently ignored. + +Now ``SnapshotStore``s must return a ``Future`` representing the deletion of the snapshot. +If this future completes successfully the ``PersistentActor`` which initiated the snapshotting will +be notified via an ``DeleteSnapshotSuccess`` message. If the deletion fails for some reason a ``DeleteSnapshotFailure`` +will be sent to the actor instead. + +For ``criteria`` based deletion of snapshots (``def deleteSnapshots(criteria: SnapshotSelectionCriteria)``) equivalent +``DeleteSnapshotsSuccess`` and ``DeleteSnapshotsFailure`` messages are sent, which contain the specified criteria, +instead of ``SnapshotMetadata`` as is the case with the single snapshot deletion messages. \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index eb8479536b..3a3de25018 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -139,8 +139,8 @@ class MySnapshotStore extends SnapshotStore { criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ??? def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ??? def saved(metadata: SnapshotMetadata): Unit = ??? - def delete(metadata: SnapshotMetadata): Unit = ??? - def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = ??? + def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = ??? + def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = ??? } object PersistenceTCKDoc { diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala index db5991db04..998696e693 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala @@ -12,4 +12,4 @@ class LocalSnapshotStoreSpec extends SnapshotStoreSpec( akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/snapshots" """)) - with PluginCleanup + with PluginCleanup \ No newline at end of file diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java index b595aac4bb..87e2d2a59f 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java @@ -5,6 +5,7 @@ package akka.persistence.journal.japi; import akka.persistence.*; +import scala.concurrent.Future; interface SyncWritePlugin { //#sync-write-plugin-api diff --git a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java index ec43378cbd..24ec4d8508 100644 --- a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java @@ -45,7 +45,7 @@ interface SnapshotStorePlugin { * @param metadata * snapshot metadata. */ - void doDelete(SnapshotMetadata metadata) throws Exception; + Future doDelete(SnapshotMetadata metadata) throws Exception; /** * Java API, Plugin API: deletes all snapshots matching `criteria`. @@ -55,6 +55,6 @@ interface SnapshotStorePlugin { * @param criteria * selection criteria for deleting. */ - void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception; + Future doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception; //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala index f7f35e0308..526e2b1750 100644 --- a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala @@ -10,7 +10,7 @@ package akka.persistence * * @param persistenceId id of persistent actor from which the snapshot was taken. * @param sequenceNr sequence number at which the snapshot was taken. - * @param timestamp time at which the snapshot was saved. + * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown. */ @SerialVersionUID(1L) // //#snapshot-metadata @@ -26,6 +26,24 @@ final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, times final case class SaveSnapshotSuccess(metadata: SnapshotMetadata) extends SnapshotProtocol.Response +/** + * Sent to a [[PersistentActor]] after successful deletion of a snapshot. + * + * @param metadata snapshot metadata. + */ +@SerialVersionUID(1L) +final case class DeleteSnapshotSuccess(metadata: SnapshotMetadata) + extends SnapshotProtocol.Response + +/** + * Sent to a [[PersistentActor]] after successful deletion of specified range of snapshots. + * + * @param criteria snapshot selection criteria. + */ +@SerialVersionUID(1L) +final case class DeleteSnapshotsSuccess(criteria: SnapshotSelectionCriteria) + extends SnapshotProtocol.Response + /** * Sent to a [[PersistentActor]] after failed saving of a snapshot. * @@ -36,6 +54,26 @@ final case class SaveSnapshotSuccess(metadata: SnapshotMetadata) final case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable) extends SnapshotProtocol.Response +/** + * Sent to a [[PersistentActor]] after failed deletion of a snapshot. + * + * @param metadata snapshot metadata. + * @param cause failure cause. + */ +@SerialVersionUID(1L) +final case class DeleteSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable) + extends SnapshotProtocol.Response + +/** + * Sent to a [[PersistentActor]] after failed deletion of a range of snapshots. + * + * @param criteria snapshot selection criteria. + * @param cause failure cause. + */ +@SerialVersionUID(1L) +final case class DeleteSnapshotsFailure(criteria: SnapshotSelectionCriteria, cause: Throwable) + extends SnapshotProtocol.Response + /** * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received * before any further replayed messages. diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala index d88aeffcab..31b9312723 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala @@ -25,26 +25,38 @@ trait Snapshotter extends Actor { */ def snapshotSequenceNr: Long + /** + * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]] + * to the running [[PersistentActor]]. + */ def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) = snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr) /** - * Saves a `snapshot` of this snapshotter's state. If saving succeeds, this snapshotter will receive a - * [[SaveSnapshotSuccess]] message, otherwise a [[SaveSnapshotFailure]] message. + * Saves a `snapshot` of this snapshotter's state. + * + * The [[PersistentActor]] will be notified about the success or failure of this + * via an [[SaveSnapshotSuccess]] or [[SaveSnapshotFailure]] message. */ def saveSnapshot(snapshot: Any): Unit = { snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot) } /** - * Deletes a snapshot identified by `sequenceNr` and `timestamp`. + * Deletes the snapshot identified by `sequenceNr`. + * + * The [[PersistentActor]] will be notified about the status of the deletion + * via an [[DeleteSnapshotSuccess]] or [[DeleteSnapshotFailure]] message. */ - def deleteSnapshot(sequenceNr: Long, timestamp: Long): Unit = { - snapshotStore ! DeleteSnapshot(SnapshotMetadata(snapshotterId, sequenceNr, timestamp)) + def deleteSnapshot(sequenceNr: Long): Unit = { + snapshotStore ! DeleteSnapshot(SnapshotMetadata(snapshotterId, sequenceNr)) } /** * Deletes all snapshots matching `criteria`. + * + * The [[PersistentActor]] will be notified about the status of the deletion + * via an [[DeleteSnapshotsSuccess]] or [[DeleteSnapshotsFailure]] message. */ def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = { snapshotStore ! DeleteSnapshots(snapshotterId, criteria) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala index c3760d756b..eb774e5b0d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala @@ -10,13 +10,16 @@ import scala.collection.JavaConverters._ import akka.persistence._ import akka.persistence.journal.{ SyncWriteJournal ⇒ SSyncWriteJournal } +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ + /** * Java API: abstract journal, optimized for synchronous writes. */ abstract class SyncWriteJournal extends AsyncRecovery with SSyncWriteJournal with SyncWritePlugin { - final def writeMessages(messages: immutable.Seq[PersistentRepr]) = + final def writeMessages(messages: immutable.Seq[PersistentRepr]): Unit = doWriteMessages(messages.asJava) - final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = + final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = doDeleteMessagesTo(persistenceId, toSequenceNr, permanent) } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index ecb1dec636..5c01614282 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -14,7 +14,7 @@ import akka.persistence._ /** * Abstract snapshot store. */ -trait SnapshotStore extends Actor { +trait SnapshotStore extends Actor with ActorLogging { import SnapshotProtocol._ import context.dispatcher @@ -29,6 +29,7 @@ trait SnapshotStore extends Actor { } recover { case e ⇒ LoadSnapshotResult(None, toSequenceNr) } pipeTo p + case SaveSnapshot(metadata, snapshot) ⇒ val p = sender() val md = metadata.copy(timestamp = System.currentTimeMillis) @@ -37,18 +38,34 @@ trait SnapshotStore extends Actor { } recover { case e ⇒ SaveSnapshotFailure(metadata, e) } to (self, p) + case evt @ SaveSnapshotSuccess(metadata) ⇒ - saved(metadata) - sender() ! evt // sender is persistentActor + try saved(metadata) finally sender() ! evt // sender is persistentActor + case evt @ SaveSnapshotFailure(metadata, _) ⇒ - delete(metadata) - sender() ! evt // sender is persistentActor + try deleteAsync(metadata) finally sender() ! evt // sender is persistentActor + case d @ DeleteSnapshot(metadata) ⇒ - delete(metadata) - if (publish) context.system.eventStream.publish(d) + val p = sender() + deleteAsync(metadata) map { + case _ ⇒ + log.warning("deleting by: " + d) + DeleteSnapshotSuccess(metadata) + } recover { + case e ⇒ DeleteSnapshotFailure(metadata, e) + } pipeTo p onComplete { + case _ if publish ⇒ context.system.eventStream.publish(d) + } + case d @ DeleteSnapshots(persistenceId, criteria) ⇒ - delete(persistenceId, criteria) - if (publish) context.system.eventStream.publish(d) + val p = sender() + deleteAsync(persistenceId, criteria) map { + case _ ⇒ DeleteSnapshotsSuccess(criteria) + } recover { + case e ⇒ DeleteSnapshotsFailure(criteria, e) + } pipeTo p onComplete { + case _ if publish ⇒ context.system.eventStream.publish(d) + } } //#snapshot-store-plugin-api @@ -73,7 +90,7 @@ trait SnapshotStore extends Actor { * * @param metadata snapshot metadata. */ - def saved(metadata: SnapshotMetadata) + def saved(metadata: SnapshotMetadata): Unit /** * Plugin API: deletes the snapshot identified by `metadata`. @@ -81,7 +98,7 @@ trait SnapshotStore extends Actor { * @param metadata snapshot metadata. */ - def delete(metadata: SnapshotMetadata) + def deleteAsync(metadata: SnapshotMetadata): Future[Unit] /** * Plugin API: deletes all snapshots matching `criteria`. @@ -89,6 +106,6 @@ trait SnapshotStore extends Actor { * @param persistenceId id of the persistent actor. * @param criteria selection criteria for deleting. */ - def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) + def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala index 20839e4112..bd3ddd033f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala @@ -4,12 +4,12 @@ package akka.persistence.snapshot.japi -import scala.concurrent.Future - import akka.japi.{ Option ⇒ JOption } import akka.persistence._ import akka.persistence.snapshot.{ SnapshotStore ⇒ SSnapshotStore } +import scala.concurrent.Future + /** * Java API: abstract snapshot store. */ @@ -22,13 +22,13 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = doSaveAsync(metadata, snapshot).map(Unit.unbox) - final def saved(metadata: SnapshotMetadata) = + final def saved(metadata: SnapshotMetadata): Unit = onSaved(metadata) - final def delete(metadata: SnapshotMetadata) = - doDelete(metadata) + final def delete(metadata: SnapshotMetadata): Future[Unit] = + doDelete(metadata).map(_ ⇒ ()) - final def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = - doDelete(persistenceId: String, criteria: SnapshotSelectionCriteria) + final def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = + doDelete(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ ⇒ ()) } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index 7b5c8e2792..845cfdccf9 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -8,17 +8,17 @@ package akka.persistence.snapshot.local import java.io._ import java.net.{ URLDecoder, URLEncoder } +import akka.actor.ActorLogging +import akka.persistence._ +import akka.persistence.serialization._ +import akka.persistence.snapshot._ +import akka.serialization.SerializationExtension +import akka.util.ByteString.UTF_8 + import scala.collection.immutable import scala.concurrent.Future import scala.util._ -import akka.actor.ActorLogging -import akka.persistence._ -import akka.persistence.snapshot._ -import akka.persistence.serialization._ -import akka.serialization.SerializationExtension -import akka.util.ByteString.UTF_8 - /** * INTERNAL API. * @@ -45,7 +45,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo // // TODO: make number of loading attempts configurable // - val metadata = snapshotMetadata(persistenceId, criteria).sorted.takeRight(3) + val metadata = snapshotMetadatas(persistenceId, criteria).sorted.takeRight(3) Future(load(metadata))(streamDispatcher) } @@ -58,13 +58,26 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo saving -= metadata } - def delete(metadata: SnapshotMetadata): Unit = { + def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = { saving -= metadata - snapshotFile(metadata).delete() + Future { + // multiple snapshot files here mean that there were multiple snapshots for this seqNr, we delete all of them + // usually snapshot-stores would keep one snapshot per sequenceNr however here in the file-based one we timestamp + // snapshots and allow multiple to be kept around (for the same seqNr) if desired + snapshotFiles(metadata).map(_.delete()) + }(streamDispatcher).map(_ ⇒ ())(streamDispatcher) } - def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = { - snapshotMetadata(persistenceId, criteria).foreach(delete) + def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = { + val metadatas = snapshotMetadatas(persistenceId, criteria) + Future.sequence { + metadatas.map(deleteAsync) + }(collection.breakOut, streamDispatcher).map(_ ⇒ ())(streamDispatcher) + } + + private def snapshotFiles(metadata: SnapshotMetadata): immutable.Seq[File] = { + // pick all files for this persistenceId and sequenceNr, old journals could have created multiple entries with appended timestamps + snapshotDir.listFiles(new SnapshotSeqNrFilenameFilter(metadata.persistenceId, metadata.sequenceNr)).toVector } @scala.annotation.tailrec @@ -81,7 +94,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo protected def save(metadata: SnapshotMetadata, snapshot: Any): Unit = { val tmpFile = withOutputStream(metadata)(serialize(_, Snapshot(snapshot))) - tmpFile.renameTo(snapshotFile(metadata)) + tmpFile.renameTo(snapshotFileForWrite(metadata)) } protected def deserialize(inputStream: InputStream): Snapshot = @@ -91,21 +104,22 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo outputStream.write(serializationExtension.findSerializerFor(snapshot).toBinary(snapshot)) protected def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) ⇒ Unit): File = { - val tmpFile = snapshotFile(metadata, extension = "tmp") + val tmpFile = snapshotFileForWrite(metadata, extension = "tmp") withStream(new BufferedOutputStream(new FileOutputStream(tmpFile)), p) tmpFile } private def withInputStream[T](metadata: SnapshotMetadata)(p: (InputStream) ⇒ T): T = - withStream(new BufferedInputStream(new FileInputStream(snapshotFile(metadata))), p) + withStream(new BufferedInputStream(new FileInputStream(snapshotFileForWrite(metadata))), p) private def withStream[A <: Closeable, B](stream: A, p: A ⇒ B): B = try { p(stream) } finally { stream.close() } - private def snapshotFile(metadata: SnapshotMetadata, extension: String = ""): File = + /** Only by persistenceId and sequenceNr, timestamp is informational - accomodates for 2.13.x series files */ + private def snapshotFileForWrite(metadata: SnapshotMetadata, extension: String = ""): File = new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, UTF_8)}-${metadata.sequenceNr}-${metadata.timestamp}${extension}") - private def snapshotMetadata(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = { + private def snapshotMetadatas(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = { val files = snapshotDir.listFiles(new SnapshotFilenameFilter(persistenceId)) if (files eq null) Nil // if the dir was removed else files.map(_.getName).collect { @@ -128,11 +142,24 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo dir } - private class SnapshotFilenameFilter(persistenceId: String) extends FilenameFilter { - def accept(dir: File, name: String): Boolean = + private final class SnapshotFilenameFilter(persistenceId: String) extends FilenameFilter { + def accept(dir: File, name: String): Boolean = { name match { case FilenamePattern(pid, snr, tms) ⇒ pid.equals(URLEncoder.encode(persistenceId)) case _ ⇒ false } + } + } + + private final class SnapshotSeqNrFilenameFilter(persistenceId: String, sequenceNr: Long) extends FilenameFilter { + private final def matches(pid: String, snr: String): Boolean = + pid.equals(URLEncoder.encode(persistenceId)) && Try(snr.toLong == sequenceNr).getOrElse(false) + + def accept(dir: File, name: String): Boolean = + name match { + case FilenamePattern(pid, snr, tms) ⇒ matches(pid, snr) + case _ ⇒ false + } + } } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index acc3680a97..a1ee7b3d1c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -4,18 +4,22 @@ package akka.persistence -import akka.actor.{ Props, ActorRef } -import akka.testkit.{ TestEvent, EventFilter, ImplicitSender, AkkaSpec } -import scala.concurrent.duration._ -import akka.persistence.snapshot.local.LocalSnapshotStore -import akka.persistence.serialization.Snapshot -import akka.event.Logging +import java.io.IOException +import akka.actor.{ ActorRef, Props } +import akka.event.Logging +import akka.persistence.snapshot.local.LocalSnapshotStore +import akka.testkit.{ EventFilter, ImplicitSender, TestEvent } + +import scala.concurrent.Future +import scala.concurrent.duration._ import scala.language.postfixOps object SnapshotFailureRobustnessSpec { case class Cmd(payload: String) + case class DeleteSnapshot(seqNr: Int) + case class DeleteSnapshots(criteria: SnapshotSelectionCriteria) class SaveSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { override def receiveRecover: Receive = { @@ -30,6 +34,26 @@ object SnapshotFailureRobustnessSpec { } } + class DeleteSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { + + // TODO do we call it "snapshot store" or "snapshot plugin", small inconsistency here + override def snapshotPluginId: String = + "akka.persistence.snapshot-store.local-delete-fail" + + override def receiveRecover: Receive = { + case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) + case other ⇒ probe ! other + } + + override def receiveCommand = { + case Cmd(payload) ⇒ persist(payload)(_ ⇒ saveSnapshot(payload)) + case DeleteSnapshot(seqNr) ⇒ deleteSnapshot(seqNr) + case DeleteSnapshots(crit) ⇒ deleteSnapshots(crit) + case SaveSnapshotSuccess(md) ⇒ probe ! md.sequenceNr + case other ⇒ probe ! other + } + } + class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { override def receiveRecover: Receive = { case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) @@ -56,11 +80,24 @@ object SnapshotFailureRobustnessSpec { } else super.save(metadata, snapshot) } } + + class DeleteFailingLocalSnapshotStore extends LocalSnapshotStore { + override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = { + super.deleteAsync(metadata) // we actually delete it properly, but act as if it failed + Future.failed(new IOException("Failed to delete snapshot for some reason!")) + } + + override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = { + super.deleteAsync(persistenceId, criteria) // we actually delete it properly, but act as if it failed + Future.failed(new IOException("Failed to delete snapshot for some reason!")) + } + } } class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some( """ akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$FailingLocalSnapshotStore" + akka.persistence.snapshot-store.local-delete-fail.class = "akka.persistence.SnapshotFailureRobustnessSpec$DeleteFailingLocalSnapshotStore" """))) with ImplicitSender { import SnapshotFailureRobustnessSpec._ @@ -95,5 +132,37 @@ class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.conf EventFilter.error(start = "Error loading snapshot ["))) } } + + "receive failure message when deleting a single snapshot fails" in { + val p = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) + val persistenceId = name + + expectMsg(RecoveryCompleted) + p ! Cmd("hello") + expectMsg(1) + p ! DeleteSnapshot(1) + expectMsgPF() { + case DeleteSnapshotFailure(SnapshotMetadata(`persistenceId`, 1, timestamp), cause) ⇒ + // ok, expected failure + cause.getMessage should include("Failed to delete") + } + } + "receive failure message when bulk deleting snapshot fails" in { + val p = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) + val persistenceId = name + + expectMsg(RecoveryCompleted) + p ! Cmd("hello") + expectMsg(1) + p ! Cmd("hola") + expectMsg(2) + val criteria = SnapshotSelectionCriteria(maxSequenceNr = 10) + p ! DeleteSnapshots(criteria) + expectMsgPF() { + case DeleteSnapshotsFailure(criteria, cause) ⇒ + // ok, expected failure + cause.getMessage should include("Failed to delete") + } + } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala index 961c0f8c45..776adf4d5d 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala @@ -1,7 +1,7 @@ package akka.persistence -import akka.actor.{ Props, Actor, ActorRef } -import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec } +import akka.actor.{ ActorLogging, ActorRef, Props } +import akka.testkit.ImplicitSender object SnapshotRecoveryLocalStoreSpec { val persistenceId = "europe" @@ -21,13 +21,14 @@ object SnapshotRecoveryLocalStoreSpec { } } - class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { + class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) + with ActorLogging { + def receiveCommand = { case _ ⇒ } def receiveRecover = { - case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) - case other ⇒ probe ! other + case other ⇒ probe ! other } override def preStart() = () } @@ -45,7 +46,6 @@ class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.con persistentActor1 ! TakeSnapshot persistentActor2 ! TakeSnapshot expectMsgAllOf(0L, 0L) - } "A persistent actor which is persisted at the same time as another actor whose persistenceId is an extension of the first " must { @@ -54,11 +54,7 @@ class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.con val recoveringActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, testActor)) recoveringActor ! Recover() - - expectMsgPF() { - case (SnapshotMetadata(pid, seqNo, timestamp), state) ⇒ - pid should ===(persistenceId) - } + expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, seqNo, timestamp), state) ⇒ } expectMsg(RecoveryCompleted) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index aae856b423..89c07f7b67 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -31,9 +31,9 @@ object SnapshotSpec { class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { override def receiveRecover: Receive = { - case payload: String ⇒ probe ! s"${payload}-${lastSequenceNr}" - case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) - case other ⇒ probe ! other + case payload: String ⇒ probe ! s"${payload}-${lastSequenceNr}" + case offer @ SnapshotOffer(md, s) ⇒ probe ! offer + case other ⇒ probe ! other } override def receiveCommand = { @@ -42,8 +42,8 @@ object SnapshotSpec { persist(payload) { _ ⇒ probe ! s"${payload}-${lastSequenceNr}" } - case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) - case other ⇒ probe ! other + case offer @ SnapshotOffer(md, s) ⇒ probe ! offer + case other ⇒ probe ! other } override def preStart() = () } @@ -54,7 +54,7 @@ object SnapshotSpec { class DeleteSnapshotTestPersistentActor(name: String, probe: ActorRef) extends LoadSnapshotTestPersistentActor(name, probe) { override def receiveCommand = receiveDelete orElse super.receiveCommand def receiveDelete: Receive = { - case Delete1(metadata) ⇒ deleteSnapshot(metadata.sequenceNr, metadata.timestamp) + case Delete1(metadata) ⇒ deleteSnapshot(metadata.sequenceNr) case DeleteN(criteria) ⇒ deleteSnapshots(criteria) } } @@ -88,7 +88,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor ! Recover() expectMsgPF() { - case (SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ + case SnapshotOffer(SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ state should ===(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp should be > (0L) } @@ -103,7 +103,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor ! Recover(toSequenceNr = 3) expectMsgPF() { - case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ + case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should ===(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -118,7 +118,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor ! "done" expectMsgPF() { - case (SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ + case SnapshotOffer(SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ state should ===(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp should be > (0L) } @@ -132,7 +132,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2)) expectMsgPF() { - case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ + case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should ===(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -149,7 +149,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3) expectMsgPF() { - case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ + case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should ===(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -166,7 +166,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsg("c-3") expectMsg(RecoveryCompleted) } - "support single message deletions" in { + "support single snapshot deletions" in { val deleteProbe = TestProbe() val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) @@ -178,8 +178,8 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor1 ! Recover(toSequenceNr = 4) persistentActor1 ! "done" - val metadata = expectMsgPF() { - case (md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ + val metadata = expectMsgPF(hint = "" + SnapshotOffer(SnapshotMetadata(persistenceId, 4, 0), null)) { + case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ state should ===(List("a-1", "b-2", "c-3", "d-4").reverse) md } @@ -188,13 +188,17 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor1 ! Delete1(metadata) deleteProbe.expectMsgType[DeleteSnapshot] + expectMsgPF(hint = "" + DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, 0))) { + case m @ DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, _)) ⇒ + info("success = " + m) + } // recover persistentActor from 2nd snapshot (3rd was deleted) plus replayed messages val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) persistentActor2 ! Recover(toSequenceNr = 4) - expectMsgPF() { - case (md @ SnapshotMetadata(`persistenceId`, 2, _), state) ⇒ + expectMsgPF(hint = "" + SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, 0), null)) { + case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 2, _), state) ⇒ state should ===(List("a-1", "b-2").reverse) md } @@ -202,7 +206,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsg("d-4") expectMsg(RecoveryCompleted) } - "support bulk message deletions" in { + "support bulk snapshot deletions" in { val deleteProbe = TestProbe() val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) @@ -212,13 +216,15 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn // recover persistentActor and the delete first three (= all) snapshots persistentActor1 ! Recover(toSequenceNr = 4) - persistentActor1 ! DeleteN(SnapshotSelectionCriteria(maxSequenceNr = 4)) + val criteria = SnapshotSelectionCriteria(maxSequenceNr = 4) + persistentActor1 ! DeleteN(criteria) expectMsgPF() { - case (md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ + case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ state should ===(List("a-1", "b-2", "c-3", "d-4").reverse) } expectMsg(RecoveryCompleted) deleteProbe.expectMsgType[DeleteSnapshots] + expectMsgPF() { case DeleteSnapshotsSuccess(`criteria`) ⇒ } // recover persistentActor from replayed messages (all snapshots deleted) val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java index b28046041e..fd7b86484f 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java @@ -5,6 +5,7 @@ package doc; //#plugin-imports +import akka.dispatch.Futures; import akka.persistence.*; import akka.persistence.journal.japi.*; import akka.persistence.snapshot.japi.*; @@ -69,11 +70,13 @@ public class LambdaPersistencePluginDocTest { } @Override - public void doDelete(SnapshotMetadata metadata) throws Exception { + public Future doDelete(SnapshotMetadata metadata) throws Exception { + return Futures.successful(null); } @Override - public void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { + public Future doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { + return Futures.successful(null); } } From 541ac83b100f4695aacd66ae785e7c4bf3646fe6 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Wed, 17 Jun 2015 00:54:36 +0200 Subject: [PATCH 2/3] +str #17751 configurable load attempts for local-snapshot-store --- .../src/main/resources/reference.conf | 4 ++++ .../snapshot/local/LocalSnapshotStore.scala | 15 ++++++++------- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index a2820916ff..52eafb5984 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -116,6 +116,10 @@ akka.persistence.snapshot-store.local { stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher" # Storage location of snapshot files. dir = "snapshots" + # Number load attempts when recovering from the latest snapshot fails + # yet older snapshot files are available. Each recovery attempt will try + # to recover using an older than previously failed-on snapshot file (if any are present). + max-load-attempts = 3 } # LevelDB journal plugin. diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index 845cfdccf9..134c50a86e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -27,7 +27,11 @@ import scala.util._ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLogging { private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r + import akka.util.Helpers._ private val config = context.system.settings.config.getConfig("akka.persistence.snapshot-store.local") + private val maxLoadAttempts = config.getInt("max-load-attempts") + .requiring(_ > 1, "max-load-attempts must be >= 1") + private val streamDispatcher = context.system.dispatchers.lookup(config.getString("stream-dispatcher")) private val dir = new File(config.getString("dir")) @@ -38,14 +42,11 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo // // Heuristics: // - // Select youngest 3 snapshots that match upper bound. This may help in situations - // where saving of a snapshot could not be completed because of a JVM crash. Hence, - // an attempt to load that snapshot will fail but loading an older snapshot may - // succeed. + // Select youngest `maxLoadAttempts` snapshots that match upper bound. + // This may help in situations where saving of a snapshot could not be completed because of a JVM crash. + // Hence, an attempt to load that snapshot will fail but loading an older snapshot may succeed. // - // TODO: make number of loading attempts configurable - // - val metadata = snapshotMetadatas(persistenceId, criteria).sorted.takeRight(3) + val metadata = snapshotMetadatas(persistenceId, criteria).sorted.takeRight(maxLoadAttempts) Future(load(metadata))(streamDispatcher) } From 2a5161ff6f39dc81a671e47e5fd78eecb3ae5da0 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Wed, 17 Jun 2015 01:23:18 +0200 Subject: [PATCH 3/3] !per #17755 removes the saved callback in plugins and adds receive --- .../src/test/java/akka/actor/JavaAPI.java | 15 ++- .../src/main/scala/akka/japi/JavaAPI.scala | 5 + .../persistence/PersistencePluginDocTest.java | 12 +- akka-docs/rst/java/persistence.rst | 8 +- .../project/migration-guide-2.3.x-2.4.x.rst | 116 ++++++++++++++---- ...e-persistence-experimental-2.3.x-2.4.x.rst | 14 --- .../PersistencePluginDocSpec.scala | 8 +- akka-docs/rst/scala/persistence.rst | 8 +- .../snapshot/japi/SnapshotStorePlugin.java | 20 ++- .../journal/AsyncWriteJournal.scala | 15 ++- .../persistence/journal/AsyncWriteProxy.scala | 29 ++--- .../persistence/snapshot/SnapshotStore.scala | 67 ++++++---- .../snapshot/japi/SnapshotStore.scala | 19 ++- .../snapshot/local/LocalSnapshotStore.scala | 24 ++-- .../scala/akka/persistence/SnapshotSpec.scala | 7 +- .../doc/LambdaPersistencePluginDocTest.java | 12 +- 16 files changed, 238 insertions(+), 141 deletions(-) diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java index 4f45491aa5..acfe70f356 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java @@ -8,8 +8,8 @@ import akka.event.Logging; import akka.event.Logging.LoggerInitialized; import akka.japi.Creator; import akka.japi.Pair; +import akka.japi.Util; import akka.japi.tuple.Tuple22; -import akka.japi.tuple.Tuple3; import akka.japi.tuple.Tuple4; import akka.routing.GetRoutees; import akka.routing.FromConfig; @@ -20,6 +20,10 @@ import akka.testkit.TestProbe; import org.junit.ClassRule; import org.junit.Test; +import scala.Option; + +import java.util.Optional; + import static org.junit.Assert.*; public class JavaAPI { @@ -132,6 +136,15 @@ public class JavaAPI { final Tuple4 t4 = Tuple4.create(1, "2", 3, 4L); Tuple22.create(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22); } + + @Test + public void mustBeAbleToCreateOptionFromOptional() { + Option empty = Util.option(Optional.ofNullable(null)); + assertTrue(empty.isEmpty()); + + Option full = Util.option(Optional.ofNullable("hello")); + assertTrue(full.isDefined()); + } public static class ActorWithConstructorParams extends UntypedActor { diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index 0a3e4b6575..30721e6c1b 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -245,4 +245,9 @@ object Util { */ def immutableIndexedSeq[T](iterable: java.lang.Iterable[T]): immutable.IndexedSeq[T] = immutableSeq(iterable).toVector + + // TODO in case we decide to pull in scala-java8-compat methods below could be removed - https://github.com/akka/akka/issues/16247 + + def option[T](jOption: java.util.Optional[T]): scala.Option[T] = + scala.Option(jOption.orElse(null.asInstanceOf[T])) } diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index 1e8da37365..107006f849 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -7,6 +7,8 @@ package docs.persistence; import java.io.File; import java.util.ArrayList; import java.util.List; +import java.util.Optional; + import akka.actor.*; import akka.dispatch.Futures; import com.typesafe.config.Config; @@ -63,7 +65,7 @@ public class PersistencePluginDocTest { class MySnapshotStore extends SnapshotStore { @Override - public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { return null; } @@ -73,16 +75,12 @@ public class PersistencePluginDocTest { } @Override - public void onSaved(SnapshotMetadata metadata) throws Exception { - } - - @Override - public Future doDelete(SnapshotMetadata metadata) throws Exception { + public Future doDeleteAsync(SnapshotMetadata metadata) { return Futures.successful(null); } @Override - public Future doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { + public Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { return Futures.successful(null); } } diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 795fb3105e..8a1634764a 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -404,9 +404,11 @@ saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay a Snapshot deletion ----------------- -A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number and the -timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should -use the ``deleteSnapshots`` method. +A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number of +when the snapshot was taken. + +To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``, +persistent actors should use the ``deleteSnapshots`` method. .. _at-least-once-delivery-java: diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index cc543e5878..61904ae6e9 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -170,29 +170,6 @@ Default interval for TestKit.awaitAssert changed to 100 ms Default check interval changed from 800 ms to 100 ms. You can define the interval explicitly if you need a longer interval. -Akka Persistence -================ - -Mandatory persistenceId ------------------------ - -It is now mandatory to define the ``persistenceId`` in subclasses of ``PersistentActor``, ``UntypedPersistentActor`` -and ``AbstractPersistentId``. - -The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical -"which persistent entity this actor represents". - -In case you want to preserve the old behavior of providing the actor's path as the default ``persistenceId``, you can easily -implement it yourself either as a helper trait or simply by overriding ``persistenceId`` as follows:: - - override def persistenceId = self.path.toStringWithoutAddress - -Persist sequence of events --------------------------- - -The ``persist`` method that takes a ``Seq`` (Scala) or ``Iterable`` (Java) of events parameter was deprecated and -renamed to ``persistAll`` to avoid mistakes of persisting other collection types as one single event by calling -the overloaded ``persist(event)`` method. Secure Cookies ============== @@ -315,3 +292,96 @@ actor external actor of how to allocate shards or rebalance shards. For the synchronous case you can return the result via ``scala.concurrent.Future.successful`` in Scala or ``akka.dispatch.Futures.successful`` in Java. + +Akka Persistence +================ + +Mendatory persistenceId +----------------------- + +It is now mandatory to define the ``persistenceId`` in subclasses of ``PersistentActor``, ``UntypedPersistentActor`` +and ``AbstractPersistentId``. + +The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical +"which persistent entity this actor represents". + +In case you want to preserve the old behavior of providing the actor's path as the default ``persistenceId``, you can easily +implement it yourself either as a helper trait or simply by overriding ``persistenceId`` as follows:: + + override def persistenceId = self.path.toStringWithoutAddress + + +Persist sequence of events +-------------------------- + +The ``persist`` method that takes a ``Seq`` (Scala) or ``Iterable`` (Java) of events parameter was deprecated and +renamed to ``persistAll`` to avoid mistakes of persisting other collection types as one single event by calling +the overloaded ``persist(event)`` method. + +Persistence Plugin APIs +======================= + +SnapshotStore: Snapshots can now be deleted asynchronously (and report failures) +-------------------------------------------------------------------------------- +Previously the ``SnapshotStore`` plugin SPI did not allow for asynchronous deletion of snapshots, +and failures of deleting a snapshot may have been even silently ignored. + +Now ``SnapshotStore`` must return a ``Future`` representing the deletion of the snapshot. +If this future completes successfully the ``PersistentActor`` which initiated the snapshotting will +be notified via an ``DeleteSnapshotSuccess`` message. If the deletion fails for some reason a ``DeleteSnapshotFailure`` +will be sent to the actor instead. + +For ``criteria`` based deletion of snapshots (``def deleteSnapshots(criteria: SnapshotSelectionCriteria)``) equivalent +``DeleteSnapshotsSuccess`` and ``DeleteSnapshotsFailure`` messages are sent, which contain the specified criteria, +instead of ``SnapshotMetadata`` as is the case with the single snapshot deletion messages. + +SnapshotStore: Removed 'saved' callback +--------------------------------------- +Snapshot Stores previously were required to implement a ``def saved(meta: SnapshotMetadata): Unit`` method which +would be called upon successful completion of a ``saveAsync`` (``doSaveAsync`` in Java API) snapshot write. + +Currently all journals and snapshot stores perform asynchronous writes and deletes, thus all could potentially benefit +from such callback methods. The only gain these callback give over composing an ``onComplete`` over ``Future`` returned +by the journal or snapshot store is that it is executed in the Actors context, thus it can safely (without additional +synchronization modify its internal state - for example a "pending writes" counter). + +However, this feature was not used by many plugins, and expanding the API to accomodate all callbacks would have grown +the API a lot. Instead, Akka Persistence 2.4.x introduces an additional (optionally overrideable) +``receivePluginInternal:Actor.Receive`` method in the plugin API, which can be used for handling those as well as any custom messages +that are sent to the plugin Actor (imagine use cases like "wake up and continue reading" or custom protocols which your +specialised journal can implement). + +Implementations using the previous feature should adjust their code as follows:: + + // previously + class MySnapshots extends SnapshotStore { + // old API: + // def saved(meta: SnapshotMetadata): Unit = doThings() + + // new API: + def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { + // completion or failure of the returned future triggers internal messages in receivePluginInternal + val f: Future[Unit] = ??? + + // custom messages can be piped to self in order to be received in receivePluginInternal + f.map(MyCustomMessage(_)) pipeTo self + + f + } + + def receivePluginInternal = { + case SaveSnapshotSuccess(metadata) => doThings() + case MyCustomMessage(data) => doOtherThings() + } + + // ... + } + +SnapshotStore: Java 8 Optional used in Java plugin APIs +------------------------------------------------------- +In places where previously ``akka.japi.Option`` was used in Java APIs, including the return type of ``doLoadAsync``, +the Java 8 provided ``Optional`` type is used now. + +Please remember that when creating an ``java.util.Optional`` instance from a (possibly) ``null`` value you will want to +use the non-throwing ``Optional.fromNullable`` method, which converts a ``null`` into a ``None`` value - which is +slightly different than its Scala counterpart (where ``Option.apply(null)`` returns ``None``). \ No newline at end of file diff --git a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst index 864c47ada4..b7435d47dd 100644 --- a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst @@ -203,17 +203,3 @@ To continue using LevelDB based persistence plugins it is now required for relat to include an additional explicit dependency declaration for the LevelDB artifacts. This change allows production akka deployments to avoid need for the LevelDB provisioning. Please see persistence extension ``reference.conf`` for details. - -SnapshotStore: Snapshots can now be deleted asynchronously (and report failures) -================================================================================ -Previously the ``SnapshotStore`` plugin SPI did not allow for asynchronous deletion of snapshots, -and failures of deleting a snapshot may have been even silently ignored. - -Now ``SnapshotStore``s must return a ``Future`` representing the deletion of the snapshot. -If this future completes successfully the ``PersistentActor`` which initiated the snapshotting will -be notified via an ``DeleteSnapshotSuccess`` message. If the deletion fails for some reason a ``DeleteSnapshotFailure`` -will be sent to the actor instead. - -For ``criteria`` based deletion of snapshots (``def deleteSnapshots(criteria: SnapshotSelectionCriteria)``) equivalent -``DeleteSnapshotsSuccess`` and ``DeleteSnapshotsFailure`` messages are sent, which contain the specified criteria, -instead of ``SnapshotMetadata`` as is the case with the single snapshot deletion messages. \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index 3a3de25018..8684590172 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -4,6 +4,7 @@ package docs.persistence +import akka.actor.Actor.Receive import akka.actor.ActorSystem import akka.testkit.TestKit import com.typesafe.config._ @@ -132,15 +133,20 @@ class MyJournal extends AsyncWriteJournal { replayCallback: (PersistentRepr) => Unit): Future[Unit] = ??? def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = ??? + + // optionally override: + override def receivePluginInternal: Receive = super.receivePluginInternal } class MySnapshotStore extends SnapshotStore { def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ??? def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ??? - def saved(metadata: SnapshotMetadata): Unit = ??? def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = ??? def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = ??? + + // optionally override: + override def receivePluginInternal: Receive = super.receivePluginInternal } object PersistenceTCKDoc { diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index b25bbec4b4..22727ee49a 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -395,9 +395,11 @@ saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay a Snapshot deletion ----------------- -A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number and the -timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should -use the ``deleteSnapshots`` method. +A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number of +when the snapshot was taken. + +To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``, +persistent actors should use the ``deleteSnapshots`` method. .. _at-least-once-delivery: diff --git a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java index 24ec4d8508..3c32516726 100644 --- a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java @@ -4,10 +4,12 @@ package akka.persistence.snapshot.japi; +import akka.persistence.SelectedSnapshot; +import akka.persistence.SnapshotMetadata; +import akka.persistence.SnapshotSelectionCriteria; import scala.concurrent.Future; -import akka.japi.Option; -import akka.persistence.*; +import java.util.Optional; interface SnapshotStorePlugin { //#snapshot-store-plugin-api @@ -19,7 +21,7 @@ interface SnapshotStorePlugin { * @param criteria * selection criteria for loading. */ - Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria); + Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria); /** * Java API, Plugin API: asynchronously saves a snapshot. @@ -31,21 +33,13 @@ interface SnapshotStorePlugin { */ Future doSaveAsync(SnapshotMetadata metadata, Object snapshot); - /** - * Java API, Plugin API: called after successful saving of a snapshot. - * - * @param metadata - * snapshot metadata. - */ - void onSaved(SnapshotMetadata metadata) throws Exception; - /** * Java API, Plugin API: deletes the snapshot identified by `metadata`. * * @param metadata * snapshot metadata. */ - Future doDelete(SnapshotMetadata metadata) throws Exception; + Future doDeleteAsync(SnapshotMetadata metadata); /** * Java API, Plugin API: deletes all snapshots matching `criteria`. @@ -55,6 +49,6 @@ interface SnapshotStorePlugin { * @param criteria * selection criteria for deleting. */ - Future doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception; + Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria); //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 1cbf85732a..07ca18d587 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -27,7 +27,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { private val resequencer = context.actorOf(Props[Resequencer]()) private var resequencerCounter = 1L - def receive = { + final def receive = receiveWriteJournal orElse receivePluginInternal + + final val receiveWriteJournal: Actor.Receive = { case WriteMessages(messages, persistentActor, actorInstanceId) ⇒ val cctr = resequencerCounter def resequence(f: PersistentRepr ⇒ Any) = messages.zipWithIndex.foreach { @@ -43,6 +45,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { resequence(WriteMessageFailure(_, e, actorInstanceId)) } resequencerCounter += messages.length + 1 + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted) ⇒ // Send replayed messages and replay result to persistentActor directly. No need // to resequence replayed messages relative to written and looped messages. @@ -58,6 +61,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } pipeTo persistentActor onSuccess { case _ if publish ⇒ context.system.eventStream.publish(r) } + case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor) ⇒ // Send read highest sequence number to persistentActor directly. No need // to resequence the result relative to written and looped messages. @@ -66,6 +70,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } recover { case e ⇒ ReadHighestSequenceNrFailure(e) } pipeTo persistentActor + case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒ asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) @@ -87,6 +92,14 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * as deleted, otherwise they are permanently deleted. */ def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] + + /** + * Plugin API + * + * Allows plugin implementers to use `f pipeTo self` and + * handle additional messages for implementing advanced features + */ + def receivePluginInternal: Actor.Receive = Actor.emptyBehavior //#journal-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala index 036e0367e5..219cdee103 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -4,17 +4,17 @@ package akka.persistence.journal -import scala.collection.immutable -import scala.concurrent._ -import scala.concurrent.duration.Duration -import scala.language.postfixOps - import akka.AkkaException import akka.actor._ import akka.pattern.ask import akka.persistence._ import akka.util._ +import scala.collection.immutable +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.language.postfixOps + /** * INTERNAL API. * @@ -24,17 +24,18 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash import AsyncWriteProxy._ import AsyncWriteTarget._ - private val initialized = super.receive + private var isInitialized = false private var store: ActorRef = _ - override def receive = { - case SetStore(ref) ⇒ - store = ref - unstashAll() - context.become(initialized) - case x ⇒ - stash() - } + override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = + if (isInitialized) super.aroundReceive(receive, msg) + else msg match { + case SetStore(ref) ⇒ + store = ref + unstashAll() + isInitialized = true + case _ ⇒ stash() + } implicit def timeout: Timeout diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index 5c01614282..e1acbb992e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -21,54 +21,67 @@ trait SnapshotStore extends Actor with ActorLogging { private val extension = Persistence(context.system) private val publish = extension.settings.internal.publishPluginCommands - final def receive = { + final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal) + + final val receiveSnapshotStore: Actor.Receive = { case LoadSnapshot(persistenceId, criteria, toSequenceNr) ⇒ - val p = sender() loadAsync(persistenceId, criteria.limit(toSequenceNr)) map { sso ⇒ LoadSnapshotResult(sso, toSequenceNr) } recover { case e ⇒ LoadSnapshotResult(None, toSequenceNr) - } pipeTo p + } pipeTo senderPersistentActor() case SaveSnapshot(metadata, snapshot) ⇒ - val p = sender() val md = metadata.copy(timestamp = System.currentTimeMillis) saveAsync(md, snapshot) map { _ ⇒ SaveSnapshotSuccess(md) } recover { case e ⇒ SaveSnapshotFailure(metadata, e) - } to (self, p) - - case evt @ SaveSnapshotSuccess(metadata) ⇒ - try saved(metadata) finally sender() ! evt // sender is persistentActor + } to (self, senderPersistentActor()) + case evt: SaveSnapshotSuccess ⇒ + try tryReceivePluginInternal(evt) finally senderPersistentActor ! evt // sender is persistentActor case evt @ SaveSnapshotFailure(metadata, _) ⇒ - try deleteAsync(metadata) finally sender() ! evt // sender is persistentActor + try { + tryReceivePluginInternal(evt) + deleteAsync(metadata) + } finally senderPersistentActor() ! evt // sender is persistentActor case d @ DeleteSnapshot(metadata) ⇒ - val p = sender() deleteAsync(metadata) map { - case _ ⇒ - log.warning("deleting by: " + d) - DeleteSnapshotSuccess(metadata) + case _ ⇒ DeleteSnapshotSuccess(metadata) } recover { case e ⇒ DeleteSnapshotFailure(metadata, e) - } pipeTo p onComplete { - case _ if publish ⇒ context.system.eventStream.publish(d) - } + } to (self, senderPersistentActor()) + if (publish) context.system.eventStream.publish(d) + + case evt: DeleteSnapshotSuccess ⇒ + try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt + case evt: DeleteSnapshotFailure ⇒ + try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt case d @ DeleteSnapshots(persistenceId, criteria) ⇒ - val p = sender() deleteAsync(persistenceId, criteria) map { case _ ⇒ DeleteSnapshotsSuccess(criteria) } recover { case e ⇒ DeleteSnapshotsFailure(criteria, e) - } pipeTo p onComplete { - case _ if publish ⇒ context.system.eventStream.publish(d) - } + } to (self, senderPersistentActor()) + if (publish) context.system.eventStream.publish(d) + + case evt: DeleteSnapshotsFailure ⇒ + try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt // sender is persistentActor + case evt: DeleteSnapshotsSuccess ⇒ + try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt } + /** Documents intent that the sender() is expected to be the PersistentActor */ + @inline private final def senderPersistentActor(): ActorRef = sender() + + private def tryReceivePluginInternal(evt: Any): Unit = + if (receivePluginInternal.isDefinedAt(evt)) receivePluginInternal(evt) + //#snapshot-store-plugin-api + /** * Plugin API: asynchronously loads a snapshot. * @@ -85,13 +98,6 @@ trait SnapshotStore extends Actor with ActorLogging { */ def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] - /** - * Plugin API: called after successful saving of a snapshot. - * - * @param metadata snapshot metadata. - */ - def saved(metadata: SnapshotMetadata): Unit - /** * Plugin API: deletes the snapshot identified by `metadata`. * @@ -107,5 +113,12 @@ trait SnapshotStore extends Actor with ActorLogging { * @param criteria selection criteria for deleting. */ def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] + + /** + * Plugin API + * Allows plugin implementers to use `f pipeTo self` and + * handle additional messages for implementing advanced features + */ + def receivePluginInternal: Actor.Receive = Actor.emptyBehavior //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala index bd3ddd033f..534423ca84 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala @@ -4,9 +4,9 @@ package akka.persistence.snapshot.japi -import akka.japi.{ Option ⇒ JOption } import akka.persistence._ import akka.persistence.snapshot.{ SnapshotStore ⇒ SSnapshotStore } +import akka.japi.Util._ import scala.concurrent.Future @@ -16,19 +16,16 @@ import scala.concurrent.Future abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { import context.dispatcher - final def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria) = - doLoadAsync(persistenceId, criteria).map(_.asScala) + override final def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = + doLoadAsync(persistenceId, criteria).map(option) - final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = + override final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = doSaveAsync(metadata, snapshot).map(Unit.unbox) - final def saved(metadata: SnapshotMetadata): Unit = - onSaved(metadata) + override final def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = + doDeleteAsync(metadata).map(_ ⇒ ()) - final def delete(metadata: SnapshotMetadata): Future[Unit] = - doDelete(metadata).map(_ ⇒ ()) - - final def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = - doDelete(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ ⇒ ()) + override final def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = + doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ ⇒ ()) } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index 134c50a86e..4dc1ff6cf5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -20,7 +20,7 @@ import scala.concurrent.Future import scala.util._ /** - * INTERNAL API. + * INTERNAL API * * Local filesystem backed snapshot store. */ @@ -38,7 +38,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo private val serializationExtension = SerializationExtension(context.system) private var saving = immutable.Set.empty[SnapshotMetadata] // saving in progress - def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { + override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { // // Heuristics: // @@ -50,16 +50,13 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo Future(load(metadata))(streamDispatcher) } - def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { + override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { saving += metadata - Future(save(metadata, snapshot))(streamDispatcher) + val completion = Future(save(metadata, snapshot))(streamDispatcher) + completion } - def saved(metadata: SnapshotMetadata): Unit = { - saving -= metadata - } - - def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = { + override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = { saving -= metadata Future { // multiple snapshot files here mean that there were multiple snapshots for this seqNr, we delete all of them @@ -69,13 +66,20 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo }(streamDispatcher).map(_ ⇒ ())(streamDispatcher) } - def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = { + override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = { val metadatas = snapshotMetadatas(persistenceId, criteria) Future.sequence { metadatas.map(deleteAsync) }(collection.breakOut, streamDispatcher).map(_ ⇒ ())(streamDispatcher) } + override def receivePluginInternal: Receive = { + case SaveSnapshotSuccess(metadata) ⇒ saving -= metadata + case _: SaveSnapshotFailure ⇒ // ignore + case _: DeleteSnapshotsSuccess ⇒ // ignore + case _: DeleteSnapshotsFailure ⇒ // ignore + } + private def snapshotFiles(metadata: SnapshotMetadata): immutable.Seq[File] = { // pick all files for this persistenceId and sequenceNr, old journals could have created multiple entries with appended timestamps snapshotDir.listFiles(new SnapshotSeqNrFilenameFilter(metadata.persistenceId, metadata.sequenceNr)).toVector diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index 89c07f7b67..350e45d160 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -178,7 +178,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor1 ! Recover(toSequenceNr = 4) persistentActor1 ! "done" - val metadata = expectMsgPF(hint = "" + SnapshotOffer(SnapshotMetadata(persistenceId, 4, 0), null)) { + val metadata = expectMsgPF() { case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ state should ===(List("a-1", "b-2", "c-3", "d-4").reverse) md @@ -188,10 +188,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor1 ! Delete1(metadata) deleteProbe.expectMsgType[DeleteSnapshot] - expectMsgPF(hint = "" + DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, 0))) { - case m @ DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, _)) ⇒ - info("success = " + m) - } + expectMsgPF() { case m @ DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, _)) ⇒ } // recover persistentActor from 2nd snapshot (3rd was deleted) plus replayed messages val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java index fd7b86484f..de5f85de5f 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java @@ -16,8 +16,8 @@ import akka.persistence.journal.leveldb.SharedLeveldbJournal; import akka.persistence.journal.leveldb.SharedLeveldbStore; import akka.japi.pf.ReceiveBuilder; import scala.concurrent.Future; -import akka.japi.Option; import akka.japi.Procedure; +import java.util.Optional; public class LambdaPersistencePluginDocTest { @@ -56,7 +56,7 @@ public class LambdaPersistencePluginDocTest { class MySnapshotStore extends SnapshotStore { @Override - public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { return null; } @@ -66,16 +66,12 @@ public class LambdaPersistencePluginDocTest { } @Override - public void onSaved(SnapshotMetadata metadata) throws Exception { - } - - @Override - public Future doDelete(SnapshotMetadata metadata) throws Exception { + public Future doDeleteAsync(SnapshotMetadata metadata) { return Futures.successful(null); } @Override - public Future doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { + public Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { return Futures.successful(null); } }