diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java index 4f45491aa5..acfe70f356 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPI.java @@ -8,8 +8,8 @@ import akka.event.Logging; import akka.event.Logging.LoggerInitialized; import akka.japi.Creator; import akka.japi.Pair; +import akka.japi.Util; import akka.japi.tuple.Tuple22; -import akka.japi.tuple.Tuple3; import akka.japi.tuple.Tuple4; import akka.routing.GetRoutees; import akka.routing.FromConfig; @@ -20,6 +20,10 @@ import akka.testkit.TestProbe; import org.junit.ClassRule; import org.junit.Test; +import scala.Option; + +import java.util.Optional; + import static org.junit.Assert.*; public class JavaAPI { @@ -132,6 +136,15 @@ public class JavaAPI { final Tuple4 t4 = Tuple4.create(1, "2", 3, 4L); Tuple22.create(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22); } + + @Test + public void mustBeAbleToCreateOptionFromOptional() { + Option empty = Util.option(Optional.ofNullable(null)); + assertTrue(empty.isEmpty()); + + Option full = Util.option(Optional.ofNullable("hello")); + assertTrue(full.isDefined()); + } public static class ActorWithConstructorParams extends UntypedActor { diff --git a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala index 0a3e4b6575..30721e6c1b 100644 --- a/akka-actor/src/main/scala/akka/japi/JavaAPI.scala +++ b/akka-actor/src/main/scala/akka/japi/JavaAPI.scala @@ -245,4 +245,9 @@ object Util { */ def immutableIndexedSeq[T](iterable: java.lang.Iterable[T]): immutable.IndexedSeq[T] = immutableSeq(iterable).toVector + + // TODO in case we decide to pull in scala-java8-compat methods below could be removed - https://github.com/akka/akka/issues/16247 + + def option[T](jOption: java.util.Optional[T]): scala.Option[T] = + scala.Option(jOption.orElse(null.asInstanceOf[T])) } diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index a6f78eeae4..107006f849 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -7,7 +7,10 @@ package docs.persistence; import java.io.File; import java.util.ArrayList; import java.util.List; +import java.util.Optional; + import akka.actor.*; +import akka.dispatch.Futures; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import org.iq80.leveldb.util.FileUtils; @@ -62,7 +65,7 @@ public class PersistencePluginDocTest { class MySnapshotStore extends SnapshotStore { @Override - public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { return null; } @@ -72,15 +75,13 @@ public class PersistencePluginDocTest { } @Override - public void onSaved(SnapshotMetadata metadata) throws Exception { + public Future doDeleteAsync(SnapshotMetadata metadata) { + return Futures.successful(null); } @Override - public void doDelete(SnapshotMetadata metadata) throws Exception { - } - - @Override - public void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { + public Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + return Futures.successful(null); } } diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 795fb3105e..8a1634764a 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -404,9 +404,11 @@ saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay a Snapshot deletion ----------------- -A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number and the -timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should -use the ``deleteSnapshots`` method. +A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number of +when the snapshot was taken. + +To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``, +persistent actors should use the ``deleteSnapshots`` method. .. _at-least-once-delivery-java: diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index cc543e5878..61904ae6e9 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -170,29 +170,6 @@ Default interval for TestKit.awaitAssert changed to 100 ms Default check interval changed from 800 ms to 100 ms. You can define the interval explicitly if you need a longer interval. -Akka Persistence -================ - -Mandatory persistenceId ------------------------ - -It is now mandatory to define the ``persistenceId`` in subclasses of ``PersistentActor``, ``UntypedPersistentActor`` -and ``AbstractPersistentId``. - -The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical -"which persistent entity this actor represents". - -In case you want to preserve the old behavior of providing the actor's path as the default ``persistenceId``, you can easily -implement it yourself either as a helper trait or simply by overriding ``persistenceId`` as follows:: - - override def persistenceId = self.path.toStringWithoutAddress - -Persist sequence of events --------------------------- - -The ``persist`` method that takes a ``Seq`` (Scala) or ``Iterable`` (Java) of events parameter was deprecated and -renamed to ``persistAll`` to avoid mistakes of persisting other collection types as one single event by calling -the overloaded ``persist(event)`` method. Secure Cookies ============== @@ -315,3 +292,96 @@ actor external actor of how to allocate shards or rebalance shards. For the synchronous case you can return the result via ``scala.concurrent.Future.successful`` in Scala or ``akka.dispatch.Futures.successful`` in Java. + +Akka Persistence +================ + +Mendatory persistenceId +----------------------- + +It is now mandatory to define the ``persistenceId`` in subclasses of ``PersistentActor``, ``UntypedPersistentActor`` +and ``AbstractPersistentId``. + +The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical +"which persistent entity this actor represents". + +In case you want to preserve the old behavior of providing the actor's path as the default ``persistenceId``, you can easily +implement it yourself either as a helper trait or simply by overriding ``persistenceId`` as follows:: + + override def persistenceId = self.path.toStringWithoutAddress + + +Persist sequence of events +-------------------------- + +The ``persist`` method that takes a ``Seq`` (Scala) or ``Iterable`` (Java) of events parameter was deprecated and +renamed to ``persistAll`` to avoid mistakes of persisting other collection types as one single event by calling +the overloaded ``persist(event)`` method. + +Persistence Plugin APIs +======================= + +SnapshotStore: Snapshots can now be deleted asynchronously (and report failures) +-------------------------------------------------------------------------------- +Previously the ``SnapshotStore`` plugin SPI did not allow for asynchronous deletion of snapshots, +and failures of deleting a snapshot may have been even silently ignored. + +Now ``SnapshotStore`` must return a ``Future`` representing the deletion of the snapshot. +If this future completes successfully the ``PersistentActor`` which initiated the snapshotting will +be notified via an ``DeleteSnapshotSuccess`` message. If the deletion fails for some reason a ``DeleteSnapshotFailure`` +will be sent to the actor instead. + +For ``criteria`` based deletion of snapshots (``def deleteSnapshots(criteria: SnapshotSelectionCriteria)``) equivalent +``DeleteSnapshotsSuccess`` and ``DeleteSnapshotsFailure`` messages are sent, which contain the specified criteria, +instead of ``SnapshotMetadata`` as is the case with the single snapshot deletion messages. + +SnapshotStore: Removed 'saved' callback +--------------------------------------- +Snapshot Stores previously were required to implement a ``def saved(meta: SnapshotMetadata): Unit`` method which +would be called upon successful completion of a ``saveAsync`` (``doSaveAsync`` in Java API) snapshot write. + +Currently all journals and snapshot stores perform asynchronous writes and deletes, thus all could potentially benefit +from such callback methods. The only gain these callback give over composing an ``onComplete`` over ``Future`` returned +by the journal or snapshot store is that it is executed in the Actors context, thus it can safely (without additional +synchronization modify its internal state - for example a "pending writes" counter). + +However, this feature was not used by many plugins, and expanding the API to accomodate all callbacks would have grown +the API a lot. Instead, Akka Persistence 2.4.x introduces an additional (optionally overrideable) +``receivePluginInternal:Actor.Receive`` method in the plugin API, which can be used for handling those as well as any custom messages +that are sent to the plugin Actor (imagine use cases like "wake up and continue reading" or custom protocols which your +specialised journal can implement). + +Implementations using the previous feature should adjust their code as follows:: + + // previously + class MySnapshots extends SnapshotStore { + // old API: + // def saved(meta: SnapshotMetadata): Unit = doThings() + + // new API: + def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { + // completion or failure of the returned future triggers internal messages in receivePluginInternal + val f: Future[Unit] = ??? + + // custom messages can be piped to self in order to be received in receivePluginInternal + f.map(MyCustomMessage(_)) pipeTo self + + f + } + + def receivePluginInternal = { + case SaveSnapshotSuccess(metadata) => doThings() + case MyCustomMessage(data) => doOtherThings() + } + + // ... + } + +SnapshotStore: Java 8 Optional used in Java plugin APIs +------------------------------------------------------- +In places where previously ``akka.japi.Option`` was used in Java APIs, including the return type of ``doLoadAsync``, +the Java 8 provided ``Optional`` type is used now. + +Please remember that when creating an ``java.util.Optional`` instance from a (possibly) ``null`` value you will want to +use the non-throwing ``Optional.fromNullable`` method, which converts a ``null`` into a ``None`` value - which is +slightly different than its Scala counterpart (where ``Option.apply(null)`` returns ``None``). \ No newline at end of file diff --git a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst index a06a4d488a..b7435d47dd 100644 --- a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst @@ -8,7 +8,6 @@ Migration Guide Akka Persistence (experimental) 2.3.3 to 2.3.4 (and 2.4.x) is provided for Persistence while under the *experimental* flag. The goal of this phase is to gather user feedback before we freeze the APIs in a major release. - defer renamed to deferAsync =========================== The ``defer`` method in ``PersistentActor`` was renamed to ``deferAsync`` as it matches the semantics diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index eb8479536b..8684590172 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -4,6 +4,7 @@ package docs.persistence +import akka.actor.Actor.Receive import akka.actor.ActorSystem import akka.testkit.TestKit import com.typesafe.config._ @@ -132,15 +133,20 @@ class MyJournal extends AsyncWriteJournal { replayCallback: (PersistentRepr) => Unit): Future[Unit] = ??? def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = ??? + + // optionally override: + override def receivePluginInternal: Receive = super.receivePluginInternal } class MySnapshotStore extends SnapshotStore { def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ??? def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ??? - def saved(metadata: SnapshotMetadata): Unit = ??? - def delete(metadata: SnapshotMetadata): Unit = ??? - def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = ??? + def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = ??? + def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = ??? + + // optionally override: + override def receivePluginInternal: Receive = super.receivePluginInternal } object PersistenceTCKDoc { diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index b25bbec4b4..22727ee49a 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -395,9 +395,11 @@ saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay a Snapshot deletion ----------------- -A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number and the -timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should -use the ``deleteSnapshots`` method. +A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number of +when the snapshot was taken. + +To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``, +persistent actors should use the ``deleteSnapshots`` method. .. _at-least-once-delivery: diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala index db5991db04..998696e693 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/snapshot/local/LocalSnapshotStoreSpec.scala @@ -12,4 +12,4 @@ class LocalSnapshotStoreSpec extends SnapshotStoreSpec( akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/snapshots" """)) - with PluginCleanup + with PluginCleanup \ No newline at end of file diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java index b595aac4bb..87e2d2a59f 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java @@ -5,6 +5,7 @@ package akka.persistence.journal.japi; import akka.persistence.*; +import scala.concurrent.Future; interface SyncWritePlugin { //#sync-write-plugin-api diff --git a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java index ec43378cbd..3c32516726 100644 --- a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java @@ -4,10 +4,12 @@ package akka.persistence.snapshot.japi; +import akka.persistence.SelectedSnapshot; +import akka.persistence.SnapshotMetadata; +import akka.persistence.SnapshotSelectionCriteria; import scala.concurrent.Future; -import akka.japi.Option; -import akka.persistence.*; +import java.util.Optional; interface SnapshotStorePlugin { //#snapshot-store-plugin-api @@ -19,7 +21,7 @@ interface SnapshotStorePlugin { * @param criteria * selection criteria for loading. */ - Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria); + Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria); /** * Java API, Plugin API: asynchronously saves a snapshot. @@ -31,21 +33,13 @@ interface SnapshotStorePlugin { */ Future doSaveAsync(SnapshotMetadata metadata, Object snapshot); - /** - * Java API, Plugin API: called after successful saving of a snapshot. - * - * @param metadata - * snapshot metadata. - */ - void onSaved(SnapshotMetadata metadata) throws Exception; - /** * Java API, Plugin API: deletes the snapshot identified by `metadata`. * * @param metadata * snapshot metadata. */ - void doDelete(SnapshotMetadata metadata) throws Exception; + Future doDeleteAsync(SnapshotMetadata metadata); /** * Java API, Plugin API: deletes all snapshots matching `criteria`. @@ -55,6 +49,6 @@ interface SnapshotStorePlugin { * @param criteria * selection criteria for deleting. */ - void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception; + Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria); //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index a2820916ff..52eafb5984 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -116,6 +116,10 @@ akka.persistence.snapshot-store.local { stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher" # Storage location of snapshot files. dir = "snapshots" + # Number load attempts when recovering from the latest snapshot fails + # yet older snapshot files are available. Each recovery attempt will try + # to recover using an older than previously failed-on snapshot file (if any are present). + max-load-attempts = 3 } # LevelDB journal plugin. diff --git a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala index f7f35e0308..526e2b1750 100644 --- a/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/SnapshotProtocol.scala @@ -10,7 +10,7 @@ package akka.persistence * * @param persistenceId id of persistent actor from which the snapshot was taken. * @param sequenceNr sequence number at which the snapshot was taken. - * @param timestamp time at which the snapshot was saved. + * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown. */ @SerialVersionUID(1L) // //#snapshot-metadata @@ -26,6 +26,24 @@ final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, times final case class SaveSnapshotSuccess(metadata: SnapshotMetadata) extends SnapshotProtocol.Response +/** + * Sent to a [[PersistentActor]] after successful deletion of a snapshot. + * + * @param metadata snapshot metadata. + */ +@SerialVersionUID(1L) +final case class DeleteSnapshotSuccess(metadata: SnapshotMetadata) + extends SnapshotProtocol.Response + +/** + * Sent to a [[PersistentActor]] after successful deletion of specified range of snapshots. + * + * @param criteria snapshot selection criteria. + */ +@SerialVersionUID(1L) +final case class DeleteSnapshotsSuccess(criteria: SnapshotSelectionCriteria) + extends SnapshotProtocol.Response + /** * Sent to a [[PersistentActor]] after failed saving of a snapshot. * @@ -36,6 +54,26 @@ final case class SaveSnapshotSuccess(metadata: SnapshotMetadata) final case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable) extends SnapshotProtocol.Response +/** + * Sent to a [[PersistentActor]] after failed deletion of a snapshot. + * + * @param metadata snapshot metadata. + * @param cause failure cause. + */ +@SerialVersionUID(1L) +final case class DeleteSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable) + extends SnapshotProtocol.Response + +/** + * Sent to a [[PersistentActor]] after failed deletion of a range of snapshots. + * + * @param criteria snapshot selection criteria. + * @param cause failure cause. + */ +@SerialVersionUID(1L) +final case class DeleteSnapshotsFailure(criteria: SnapshotSelectionCriteria, cause: Throwable) + extends SnapshotProtocol.Response + /** * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received * before any further replayed messages. diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala index d88aeffcab..31b9312723 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala @@ -25,26 +25,38 @@ trait Snapshotter extends Actor { */ def snapshotSequenceNr: Long + /** + * Instructs the snapshot store to load the specified snapshot and send it via an [[SnapshotOffer]] + * to the running [[PersistentActor]]. + */ def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) = snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr) /** - * Saves a `snapshot` of this snapshotter's state. If saving succeeds, this snapshotter will receive a - * [[SaveSnapshotSuccess]] message, otherwise a [[SaveSnapshotFailure]] message. + * Saves a `snapshot` of this snapshotter's state. + * + * The [[PersistentActor]] will be notified about the success or failure of this + * via an [[SaveSnapshotSuccess]] or [[SaveSnapshotFailure]] message. */ def saveSnapshot(snapshot: Any): Unit = { snapshotStore ! SaveSnapshot(SnapshotMetadata(snapshotterId, snapshotSequenceNr), snapshot) } /** - * Deletes a snapshot identified by `sequenceNr` and `timestamp`. + * Deletes the snapshot identified by `sequenceNr`. + * + * The [[PersistentActor]] will be notified about the status of the deletion + * via an [[DeleteSnapshotSuccess]] or [[DeleteSnapshotFailure]] message. */ - def deleteSnapshot(sequenceNr: Long, timestamp: Long): Unit = { - snapshotStore ! DeleteSnapshot(SnapshotMetadata(snapshotterId, sequenceNr, timestamp)) + def deleteSnapshot(sequenceNr: Long): Unit = { + snapshotStore ! DeleteSnapshot(SnapshotMetadata(snapshotterId, sequenceNr)) } /** * Deletes all snapshots matching `criteria`. + * + * The [[PersistentActor]] will be notified about the status of the deletion + * via an [[DeleteSnapshotsSuccess]] or [[DeleteSnapshotsFailure]] message. */ def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = { snapshotStore ! DeleteSnapshots(snapshotterId, criteria) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 1cbf85732a..07ca18d587 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -27,7 +27,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { private val resequencer = context.actorOf(Props[Resequencer]()) private var resequencerCounter = 1L - def receive = { + final def receive = receiveWriteJournal orElse receivePluginInternal + + final val receiveWriteJournal: Actor.Receive = { case WriteMessages(messages, persistentActor, actorInstanceId) ⇒ val cctr = resequencerCounter def resequence(f: PersistentRepr ⇒ Any) = messages.zipWithIndex.foreach { @@ -43,6 +45,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { resequence(WriteMessageFailure(_, e, actorInstanceId)) } resequencerCounter += messages.length + 1 + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted) ⇒ // Send replayed messages and replay result to persistentActor directly. No need // to resequence replayed messages relative to written and looped messages. @@ -58,6 +61,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } pipeTo persistentActor onSuccess { case _ if publish ⇒ context.system.eventStream.publish(r) } + case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor) ⇒ // Send read highest sequence number to persistentActor directly. No need // to resequence the result relative to written and looped messages. @@ -66,6 +70,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } recover { case e ⇒ ReadHighestSequenceNrFailure(e) } pipeTo persistentActor + case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒ asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) @@ -87,6 +92,14 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * as deleted, otherwise they are permanently deleted. */ def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] + + /** + * Plugin API + * + * Allows plugin implementers to use `f pipeTo self` and + * handle additional messages for implementing advanced features + */ + def receivePluginInternal: Actor.Receive = Actor.emptyBehavior //#journal-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala index 036e0367e5..219cdee103 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -4,17 +4,17 @@ package akka.persistence.journal -import scala.collection.immutable -import scala.concurrent._ -import scala.concurrent.duration.Duration -import scala.language.postfixOps - import akka.AkkaException import akka.actor._ import akka.pattern.ask import akka.persistence._ import akka.util._ +import scala.collection.immutable +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.language.postfixOps + /** * INTERNAL API. * @@ -24,17 +24,18 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash import AsyncWriteProxy._ import AsyncWriteTarget._ - private val initialized = super.receive + private var isInitialized = false private var store: ActorRef = _ - override def receive = { - case SetStore(ref) ⇒ - store = ref - unstashAll() - context.become(initialized) - case x ⇒ - stash() - } + override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = + if (isInitialized) super.aroundReceive(receive, msg) + else msg match { + case SetStore(ref) ⇒ + store = ref + unstashAll() + isInitialized = true + case _ ⇒ stash() + } implicit def timeout: Timeout diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala index c3760d756b..eb774e5b0d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala @@ -10,13 +10,16 @@ import scala.collection.JavaConverters._ import akka.persistence._ import akka.persistence.journal.{ SyncWriteJournal ⇒ SSyncWriteJournal } +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ + /** * Java API: abstract journal, optimized for synchronous writes. */ abstract class SyncWriteJournal extends AsyncRecovery with SSyncWriteJournal with SyncWritePlugin { - final def writeMessages(messages: immutable.Seq[PersistentRepr]) = + final def writeMessages(messages: immutable.Seq[PersistentRepr]): Unit = doWriteMessages(messages.asJava) - final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = + final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = doDeleteMessagesTo(persistenceId, toSequenceNr, permanent) } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index ecb1dec636..e1acbb992e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -14,44 +14,74 @@ import akka.persistence._ /** * Abstract snapshot store. */ -trait SnapshotStore extends Actor { +trait SnapshotStore extends Actor with ActorLogging { import SnapshotProtocol._ import context.dispatcher private val extension = Persistence(context.system) private val publish = extension.settings.internal.publishPluginCommands - final def receive = { + final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal) + + final val receiveSnapshotStore: Actor.Receive = { case LoadSnapshot(persistenceId, criteria, toSequenceNr) ⇒ - val p = sender() loadAsync(persistenceId, criteria.limit(toSequenceNr)) map { sso ⇒ LoadSnapshotResult(sso, toSequenceNr) } recover { case e ⇒ LoadSnapshotResult(None, toSequenceNr) - } pipeTo p + } pipeTo senderPersistentActor() + case SaveSnapshot(metadata, snapshot) ⇒ - val p = sender() val md = metadata.copy(timestamp = System.currentTimeMillis) saveAsync(md, snapshot) map { _ ⇒ SaveSnapshotSuccess(md) } recover { case e ⇒ SaveSnapshotFailure(metadata, e) - } to (self, p) - case evt @ SaveSnapshotSuccess(metadata) ⇒ - saved(metadata) - sender() ! evt // sender is persistentActor + } to (self, senderPersistentActor()) + + case evt: SaveSnapshotSuccess ⇒ + try tryReceivePluginInternal(evt) finally senderPersistentActor ! evt // sender is persistentActor case evt @ SaveSnapshotFailure(metadata, _) ⇒ - delete(metadata) - sender() ! evt // sender is persistentActor + try { + tryReceivePluginInternal(evt) + deleteAsync(metadata) + } finally senderPersistentActor() ! evt // sender is persistentActor + case d @ DeleteSnapshot(metadata) ⇒ - delete(metadata) + deleteAsync(metadata) map { + case _ ⇒ DeleteSnapshotSuccess(metadata) + } recover { + case e ⇒ DeleteSnapshotFailure(metadata, e) + } to (self, senderPersistentActor()) if (publish) context.system.eventStream.publish(d) + + case evt: DeleteSnapshotSuccess ⇒ + try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt + case evt: DeleteSnapshotFailure ⇒ + try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt + case d @ DeleteSnapshots(persistenceId, criteria) ⇒ - delete(persistenceId, criteria) + deleteAsync(persistenceId, criteria) map { + case _ ⇒ DeleteSnapshotsSuccess(criteria) + } recover { + case e ⇒ DeleteSnapshotsFailure(criteria, e) + } to (self, senderPersistentActor()) if (publish) context.system.eventStream.publish(d) + + case evt: DeleteSnapshotsFailure ⇒ + try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt // sender is persistentActor + case evt: DeleteSnapshotsSuccess ⇒ + try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt } + /** Documents intent that the sender() is expected to be the PersistentActor */ + @inline private final def senderPersistentActor(): ActorRef = sender() + + private def tryReceivePluginInternal(evt: Any): Unit = + if (receivePluginInternal.isDefinedAt(evt)) receivePluginInternal(evt) + //#snapshot-store-plugin-api + /** * Plugin API: asynchronously loads a snapshot. * @@ -68,20 +98,13 @@ trait SnapshotStore extends Actor { */ def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] - /** - * Plugin API: called after successful saving of a snapshot. - * - * @param metadata snapshot metadata. - */ - def saved(metadata: SnapshotMetadata) - /** * Plugin API: deletes the snapshot identified by `metadata`. * * @param metadata snapshot metadata. */ - def delete(metadata: SnapshotMetadata) + def deleteAsync(metadata: SnapshotMetadata): Future[Unit] /** * Plugin API: deletes all snapshots matching `criteria`. @@ -89,6 +112,13 @@ trait SnapshotStore extends Actor { * @param persistenceId id of the persistent actor. * @param criteria selection criteria for deleting. */ - def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) + def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] + + /** + * Plugin API + * Allows plugin implementers to use `f pipeTo self` and + * handle additional messages for implementing advanced features + */ + def receivePluginInternal: Actor.Receive = Actor.emptyBehavior //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala index 20839e4112..534423ca84 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala @@ -4,11 +4,11 @@ package akka.persistence.snapshot.japi -import scala.concurrent.Future - -import akka.japi.{ Option ⇒ JOption } import akka.persistence._ import akka.persistence.snapshot.{ SnapshotStore ⇒ SSnapshotStore } +import akka.japi.Util._ + +import scala.concurrent.Future /** * Java API: abstract snapshot store. @@ -16,19 +16,16 @@ import akka.persistence.snapshot.{ SnapshotStore ⇒ SSnapshotStore } abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { import context.dispatcher - final def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria) = - doLoadAsync(persistenceId, criteria).map(_.asScala) + override final def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = + doLoadAsync(persistenceId, criteria).map(option) - final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = + override final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = doSaveAsync(metadata, snapshot).map(Unit.unbox) - final def saved(metadata: SnapshotMetadata) = - onSaved(metadata) + override final def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = + doDeleteAsync(metadata).map(_ ⇒ ()) - final def delete(metadata: SnapshotMetadata) = - doDelete(metadata) - - final def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = - doDelete(persistenceId: String, criteria: SnapshotSelectionCriteria) + override final def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = + doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ ⇒ ()) } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index 7b5c8e2792..4dc1ff6cf5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -8,63 +8,81 @@ package akka.persistence.snapshot.local import java.io._ import java.net.{ URLDecoder, URLEncoder } +import akka.actor.ActorLogging +import akka.persistence._ +import akka.persistence.serialization._ +import akka.persistence.snapshot._ +import akka.serialization.SerializationExtension +import akka.util.ByteString.UTF_8 + import scala.collection.immutable import scala.concurrent.Future import scala.util._ -import akka.actor.ActorLogging -import akka.persistence._ -import akka.persistence.snapshot._ -import akka.persistence.serialization._ -import akka.serialization.SerializationExtension -import akka.util.ByteString.UTF_8 - /** - * INTERNAL API. + * INTERNAL API * * Local filesystem backed snapshot store. */ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLogging { private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r + import akka.util.Helpers._ private val config = context.system.settings.config.getConfig("akka.persistence.snapshot-store.local") + private val maxLoadAttempts = config.getInt("max-load-attempts") + .requiring(_ > 1, "max-load-attempts must be >= 1") + private val streamDispatcher = context.system.dispatchers.lookup(config.getString("stream-dispatcher")) private val dir = new File(config.getString("dir")) private val serializationExtension = SerializationExtension(context.system) private var saving = immutable.Set.empty[SnapshotMetadata] // saving in progress - def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { + override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { // // Heuristics: // - // Select youngest 3 snapshots that match upper bound. This may help in situations - // where saving of a snapshot could not be completed because of a JVM crash. Hence, - // an attempt to load that snapshot will fail but loading an older snapshot may - // succeed. + // Select youngest `maxLoadAttempts` snapshots that match upper bound. + // This may help in situations where saving of a snapshot could not be completed because of a JVM crash. + // Hence, an attempt to load that snapshot will fail but loading an older snapshot may succeed. // - // TODO: make number of loading attempts configurable - // - val metadata = snapshotMetadata(persistenceId, criteria).sorted.takeRight(3) + val metadata = snapshotMetadatas(persistenceId, criteria).sorted.takeRight(maxLoadAttempts) Future(load(metadata))(streamDispatcher) } - def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { + override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { saving += metadata - Future(save(metadata, snapshot))(streamDispatcher) + val completion = Future(save(metadata, snapshot))(streamDispatcher) + completion } - def saved(metadata: SnapshotMetadata): Unit = { + override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = { saving -= metadata + Future { + // multiple snapshot files here mean that there were multiple snapshots for this seqNr, we delete all of them + // usually snapshot-stores would keep one snapshot per sequenceNr however here in the file-based one we timestamp + // snapshots and allow multiple to be kept around (for the same seqNr) if desired + snapshotFiles(metadata).map(_.delete()) + }(streamDispatcher).map(_ ⇒ ())(streamDispatcher) } - def delete(metadata: SnapshotMetadata): Unit = { - saving -= metadata - snapshotFile(metadata).delete() + override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = { + val metadatas = snapshotMetadatas(persistenceId, criteria) + Future.sequence { + metadatas.map(deleteAsync) + }(collection.breakOut, streamDispatcher).map(_ ⇒ ())(streamDispatcher) } - def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = { - snapshotMetadata(persistenceId, criteria).foreach(delete) + override def receivePluginInternal: Receive = { + case SaveSnapshotSuccess(metadata) ⇒ saving -= metadata + case _: SaveSnapshotFailure ⇒ // ignore + case _: DeleteSnapshotsSuccess ⇒ // ignore + case _: DeleteSnapshotsFailure ⇒ // ignore + } + + private def snapshotFiles(metadata: SnapshotMetadata): immutable.Seq[File] = { + // pick all files for this persistenceId and sequenceNr, old journals could have created multiple entries with appended timestamps + snapshotDir.listFiles(new SnapshotSeqNrFilenameFilter(metadata.persistenceId, metadata.sequenceNr)).toVector } @scala.annotation.tailrec @@ -81,7 +99,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo protected def save(metadata: SnapshotMetadata, snapshot: Any): Unit = { val tmpFile = withOutputStream(metadata)(serialize(_, Snapshot(snapshot))) - tmpFile.renameTo(snapshotFile(metadata)) + tmpFile.renameTo(snapshotFileForWrite(metadata)) } protected def deserialize(inputStream: InputStream): Snapshot = @@ -91,21 +109,22 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo outputStream.write(serializationExtension.findSerializerFor(snapshot).toBinary(snapshot)) protected def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) ⇒ Unit): File = { - val tmpFile = snapshotFile(metadata, extension = "tmp") + val tmpFile = snapshotFileForWrite(metadata, extension = "tmp") withStream(new BufferedOutputStream(new FileOutputStream(tmpFile)), p) tmpFile } private def withInputStream[T](metadata: SnapshotMetadata)(p: (InputStream) ⇒ T): T = - withStream(new BufferedInputStream(new FileInputStream(snapshotFile(metadata))), p) + withStream(new BufferedInputStream(new FileInputStream(snapshotFileForWrite(metadata))), p) private def withStream[A <: Closeable, B](stream: A, p: A ⇒ B): B = try { p(stream) } finally { stream.close() } - private def snapshotFile(metadata: SnapshotMetadata, extension: String = ""): File = + /** Only by persistenceId and sequenceNr, timestamp is informational - accomodates for 2.13.x series files */ + private def snapshotFileForWrite(metadata: SnapshotMetadata, extension: String = ""): File = new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, UTF_8)}-${metadata.sequenceNr}-${metadata.timestamp}${extension}") - private def snapshotMetadata(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = { + private def snapshotMetadatas(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = { val files = snapshotDir.listFiles(new SnapshotFilenameFilter(persistenceId)) if (files eq null) Nil // if the dir was removed else files.map(_.getName).collect { @@ -128,11 +147,24 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo dir } - private class SnapshotFilenameFilter(persistenceId: String) extends FilenameFilter { - def accept(dir: File, name: String): Boolean = + private final class SnapshotFilenameFilter(persistenceId: String) extends FilenameFilter { + def accept(dir: File, name: String): Boolean = { name match { case FilenamePattern(pid, snr, tms) ⇒ pid.equals(URLEncoder.encode(persistenceId)) case _ ⇒ false } + } + } + + private final class SnapshotSeqNrFilenameFilter(persistenceId: String, sequenceNr: Long) extends FilenameFilter { + private final def matches(pid: String, snr: String): Boolean = + pid.equals(URLEncoder.encode(persistenceId)) && Try(snr.toLong == sequenceNr).getOrElse(false) + + def accept(dir: File, name: String): Boolean = + name match { + case FilenamePattern(pid, snr, tms) ⇒ matches(pid, snr) + case _ ⇒ false + } + } } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index acc3680a97..a1ee7b3d1c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -4,18 +4,22 @@ package akka.persistence -import akka.actor.{ Props, ActorRef } -import akka.testkit.{ TestEvent, EventFilter, ImplicitSender, AkkaSpec } -import scala.concurrent.duration._ -import akka.persistence.snapshot.local.LocalSnapshotStore -import akka.persistence.serialization.Snapshot -import akka.event.Logging +import java.io.IOException +import akka.actor.{ ActorRef, Props } +import akka.event.Logging +import akka.persistence.snapshot.local.LocalSnapshotStore +import akka.testkit.{ EventFilter, ImplicitSender, TestEvent } + +import scala.concurrent.Future +import scala.concurrent.duration._ import scala.language.postfixOps object SnapshotFailureRobustnessSpec { case class Cmd(payload: String) + case class DeleteSnapshot(seqNr: Int) + case class DeleteSnapshots(criteria: SnapshotSelectionCriteria) class SaveSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { override def receiveRecover: Receive = { @@ -30,6 +34,26 @@ object SnapshotFailureRobustnessSpec { } } + class DeleteSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { + + // TODO do we call it "snapshot store" or "snapshot plugin", small inconsistency here + override def snapshotPluginId: String = + "akka.persistence.snapshot-store.local-delete-fail" + + override def receiveRecover: Receive = { + case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) + case other ⇒ probe ! other + } + + override def receiveCommand = { + case Cmd(payload) ⇒ persist(payload)(_ ⇒ saveSnapshot(payload)) + case DeleteSnapshot(seqNr) ⇒ deleteSnapshot(seqNr) + case DeleteSnapshots(crit) ⇒ deleteSnapshots(crit) + case SaveSnapshotSuccess(md) ⇒ probe ! md.sequenceNr + case other ⇒ probe ! other + } + } + class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { override def receiveRecover: Receive = { case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) @@ -56,11 +80,24 @@ object SnapshotFailureRobustnessSpec { } else super.save(metadata, snapshot) } } + + class DeleteFailingLocalSnapshotStore extends LocalSnapshotStore { + override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = { + super.deleteAsync(metadata) // we actually delete it properly, but act as if it failed + Future.failed(new IOException("Failed to delete snapshot for some reason!")) + } + + override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = { + super.deleteAsync(persistenceId, criteria) // we actually delete it properly, but act as if it failed + Future.failed(new IOException("Failed to delete snapshot for some reason!")) + } + } } class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some( """ akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$FailingLocalSnapshotStore" + akka.persistence.snapshot-store.local-delete-fail.class = "akka.persistence.SnapshotFailureRobustnessSpec$DeleteFailingLocalSnapshotStore" """))) with ImplicitSender { import SnapshotFailureRobustnessSpec._ @@ -95,5 +132,37 @@ class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.conf EventFilter.error(start = "Error loading snapshot ["))) } } + + "receive failure message when deleting a single snapshot fails" in { + val p = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) + val persistenceId = name + + expectMsg(RecoveryCompleted) + p ! Cmd("hello") + expectMsg(1) + p ! DeleteSnapshot(1) + expectMsgPF() { + case DeleteSnapshotFailure(SnapshotMetadata(`persistenceId`, 1, timestamp), cause) ⇒ + // ok, expected failure + cause.getMessage should include("Failed to delete") + } + } + "receive failure message when bulk deleting snapshot fails" in { + val p = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) + val persistenceId = name + + expectMsg(RecoveryCompleted) + p ! Cmd("hello") + expectMsg(1) + p ! Cmd("hola") + expectMsg(2) + val criteria = SnapshotSelectionCriteria(maxSequenceNr = 10) + p ! DeleteSnapshots(criteria) + expectMsgPF() { + case DeleteSnapshotsFailure(criteria, cause) ⇒ + // ok, expected failure + cause.getMessage should include("Failed to delete") + } + } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala index 961c0f8c45..776adf4d5d 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotRecoveryLocalStoreSpec.scala @@ -1,7 +1,7 @@ package akka.persistence -import akka.actor.{ Props, Actor, ActorRef } -import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec } +import akka.actor.{ ActorLogging, ActorRef, Props } +import akka.testkit.ImplicitSender object SnapshotRecoveryLocalStoreSpec { val persistenceId = "europe" @@ -21,13 +21,14 @@ object SnapshotRecoveryLocalStoreSpec { } } - class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { + class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) + with ActorLogging { + def receiveCommand = { case _ ⇒ } def receiveRecover = { - case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) - case other ⇒ probe ! other + case other ⇒ probe ! other } override def preStart() = () } @@ -45,7 +46,6 @@ class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.con persistentActor1 ! TakeSnapshot persistentActor2 ! TakeSnapshot expectMsgAllOf(0L, 0L) - } "A persistent actor which is persisted at the same time as another actor whose persistenceId is an extension of the first " must { @@ -54,11 +54,7 @@ class SnapshotRecoveryLocalStoreSpec extends PersistenceSpec(PersistenceSpec.con val recoveringActor = system.actorOf(Props(classOf[LoadSnapshotTestPersistentActor], persistenceId, testActor)) recoveringActor ! Recover() - - expectMsgPF() { - case (SnapshotMetadata(pid, seqNo, timestamp), state) ⇒ - pid should ===(persistenceId) - } + expectMsgPF() { case SnapshotOffer(SnapshotMetadata(`persistenceId`, seqNo, timestamp), state) ⇒ } expectMsg(RecoveryCompleted) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index aae856b423..350e45d160 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -31,9 +31,9 @@ object SnapshotSpec { class LoadSnapshotTestPersistentActor(name: String, probe: ActorRef) extends NamedPersistentActor(name) { override def receiveRecover: Receive = { - case payload: String ⇒ probe ! s"${payload}-${lastSequenceNr}" - case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) - case other ⇒ probe ! other + case payload: String ⇒ probe ! s"${payload}-${lastSequenceNr}" + case offer @ SnapshotOffer(md, s) ⇒ probe ! offer + case other ⇒ probe ! other } override def receiveCommand = { @@ -42,8 +42,8 @@ object SnapshotSpec { persist(payload) { _ ⇒ probe ! s"${payload}-${lastSequenceNr}" } - case SnapshotOffer(md, s) ⇒ probe ! ((md, s)) - case other ⇒ probe ! other + case offer @ SnapshotOffer(md, s) ⇒ probe ! offer + case other ⇒ probe ! other } override def preStart() = () } @@ -54,7 +54,7 @@ object SnapshotSpec { class DeleteSnapshotTestPersistentActor(name: String, probe: ActorRef) extends LoadSnapshotTestPersistentActor(name, probe) { override def receiveCommand = receiveDelete orElse super.receiveCommand def receiveDelete: Receive = { - case Delete1(metadata) ⇒ deleteSnapshot(metadata.sequenceNr, metadata.timestamp) + case Delete1(metadata) ⇒ deleteSnapshot(metadata.sequenceNr) case DeleteN(criteria) ⇒ deleteSnapshots(criteria) } } @@ -88,7 +88,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor ! Recover() expectMsgPF() { - case (SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ + case SnapshotOffer(SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ state should ===(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp should be > (0L) } @@ -103,7 +103,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor ! Recover(toSequenceNr = 3) expectMsgPF() { - case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ + case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should ===(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -118,7 +118,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor ! "done" expectMsgPF() { - case (SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ + case SnapshotOffer(SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ state should ===(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp should be > (0L) } @@ -132,7 +132,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2)) expectMsgPF() { - case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ + case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should ===(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -149,7 +149,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3) expectMsgPF() { - case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ + case SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should ===(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -166,7 +166,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsg("c-3") expectMsg(RecoveryCompleted) } - "support single message deletions" in { + "support single snapshot deletions" in { val deleteProbe = TestProbe() val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) @@ -179,7 +179,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor1 ! "done" val metadata = expectMsgPF() { - case (md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ + case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ state should ===(List("a-1", "b-2", "c-3", "d-4").reverse) md } @@ -188,13 +188,14 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn persistentActor1 ! Delete1(metadata) deleteProbe.expectMsgType[DeleteSnapshot] + expectMsgPF() { case m @ DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, _)) ⇒ } // recover persistentActor from 2nd snapshot (3rd was deleted) plus replayed messages val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) persistentActor2 ! Recover(toSequenceNr = 4) - expectMsgPF() { - case (md @ SnapshotMetadata(`persistenceId`, 2, _), state) ⇒ + expectMsgPF(hint = "" + SnapshotOffer(SnapshotMetadata(`persistenceId`, 2, 0), null)) { + case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 2, _), state) ⇒ state should ===(List("a-1", "b-2").reverse) md } @@ -202,7 +203,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn expectMsg("d-4") expectMsg(RecoveryCompleted) } - "support bulk message deletions" in { + "support bulk snapshot deletions" in { val deleteProbe = TestProbe() val persistentActor1 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) @@ -212,13 +213,15 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn // recover persistentActor and the delete first three (= all) snapshots persistentActor1 ! Recover(toSequenceNr = 4) - persistentActor1 ! DeleteN(SnapshotSelectionCriteria(maxSequenceNr = 4)) + val criteria = SnapshotSelectionCriteria(maxSequenceNr = 4) + persistentActor1 ! DeleteN(criteria) expectMsgPF() { - case (md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ + case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ state should ===(List("a-1", "b-2", "c-3", "d-4").reverse) } expectMsg(RecoveryCompleted) deleteProbe.expectMsgType[DeleteSnapshots] + expectMsgPF() { case DeleteSnapshotsSuccess(`criteria`) ⇒ } // recover persistentActor from replayed messages (all snapshots deleted) val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor)) diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java index b28046041e..de5f85de5f 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java @@ -5,6 +5,7 @@ package doc; //#plugin-imports +import akka.dispatch.Futures; import akka.persistence.*; import akka.persistence.journal.japi.*; import akka.persistence.snapshot.japi.*; @@ -15,8 +16,8 @@ import akka.persistence.journal.leveldb.SharedLeveldbJournal; import akka.persistence.journal.leveldb.SharedLeveldbStore; import akka.japi.pf.ReceiveBuilder; import scala.concurrent.Future; -import akka.japi.Option; import akka.japi.Procedure; +import java.util.Optional; public class LambdaPersistencePluginDocTest { @@ -55,7 +56,7 @@ public class LambdaPersistencePluginDocTest { class MySnapshotStore extends SnapshotStore { @Override - public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { return null; } @@ -65,15 +66,13 @@ public class LambdaPersistencePluginDocTest { } @Override - public void onSaved(SnapshotMetadata metadata) throws Exception { + public Future doDeleteAsync(SnapshotMetadata metadata) { + return Futures.successful(null); } @Override - public void doDelete(SnapshotMetadata metadata) throws Exception { - } - - @Override - public void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { + public Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { + return Futures.successful(null); } }