!per #17755 removes the saved callback in plugins and adds receive
This commit is contained in:
parent
541ac83b10
commit
2a5161ff6f
16 changed files with 238 additions and 141 deletions
|
|
@ -8,8 +8,8 @@ import akka.event.Logging;
|
|||
import akka.event.Logging.LoggerInitialized;
|
||||
import akka.japi.Creator;
|
||||
import akka.japi.Pair;
|
||||
import akka.japi.Util;
|
||||
import akka.japi.tuple.Tuple22;
|
||||
import akka.japi.tuple.Tuple3;
|
||||
import akka.japi.tuple.Tuple4;
|
||||
import akka.routing.GetRoutees;
|
||||
import akka.routing.FromConfig;
|
||||
|
|
@ -20,6 +20,10 @@ import akka.testkit.TestProbe;
|
|||
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import scala.Option;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class JavaAPI {
|
||||
|
|
@ -133,6 +137,15 @@ public class JavaAPI {
|
|||
Tuple22.create(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToCreateOptionFromOptional() {
|
||||
Option<Object> empty = Util.option(Optional.ofNullable(null));
|
||||
assertTrue(empty.isEmpty());
|
||||
|
||||
Option<String> full = Util.option(Optional.ofNullable("hello"));
|
||||
assertTrue(full.isDefined());
|
||||
}
|
||||
|
||||
public static class ActorWithConstructorParams extends UntypedActor {
|
||||
|
||||
private final String a;
|
||||
|
|
|
|||
|
|
@ -245,4 +245,9 @@ object Util {
|
|||
*/
|
||||
def immutableIndexedSeq[T](iterable: java.lang.Iterable[T]): immutable.IndexedSeq[T] =
|
||||
immutableSeq(iterable).toVector
|
||||
|
||||
// TODO in case we decide to pull in scala-java8-compat methods below could be removed - https://github.com/akka/akka/issues/16247
|
||||
|
||||
def option[T](jOption: java.util.Optional[T]): scala.Option[T] =
|
||||
scala.Option(jOption.orElse(null.asInstanceOf[T]))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@ package docs.persistence;
|
|||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.dispatch.Futures;
|
||||
import com.typesafe.config.Config;
|
||||
|
|
@ -63,7 +65,7 @@ public class PersistencePluginDocTest {
|
|||
|
||||
class MySnapshotStore extends SnapshotStore {
|
||||
@Override
|
||||
public Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
|
||||
public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -73,16 +75,12 @@ public class PersistencePluginDocTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onSaved(SnapshotMetadata metadata) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> doDelete(SnapshotMetadata metadata) throws Exception {
|
||||
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
|
||||
return Futures.successful(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception {
|
||||
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
|
||||
return Futures.successful(null);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -404,9 +404,11 @@ saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay a
|
|||
Snapshot deletion
|
||||
-----------------
|
||||
|
||||
A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number and the
|
||||
timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should
|
||||
use the ``deleteSnapshots`` method.
|
||||
A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number of
|
||||
when the snapshot was taken.
|
||||
|
||||
To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``,
|
||||
persistent actors should use the ``deleteSnapshots`` method.
|
||||
|
||||
.. _at-least-once-delivery-java:
|
||||
|
||||
|
|
|
|||
|
|
@ -170,29 +170,6 @@ Default interval for TestKit.awaitAssert changed to 100 ms
|
|||
Default check interval changed from 800 ms to 100 ms. You can define the interval explicitly if you need a
|
||||
longer interval.
|
||||
|
||||
Akka Persistence
|
||||
================
|
||||
|
||||
Mandatory persistenceId
|
||||
-----------------------
|
||||
|
||||
It is now mandatory to define the ``persistenceId`` in subclasses of ``PersistentActor``, ``UntypedPersistentActor``
|
||||
and ``AbstractPersistentId``.
|
||||
|
||||
The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical
|
||||
"which persistent entity this actor represents".
|
||||
|
||||
In case you want to preserve the old behavior of providing the actor's path as the default ``persistenceId``, you can easily
|
||||
implement it yourself either as a helper trait or simply by overriding ``persistenceId`` as follows::
|
||||
|
||||
override def persistenceId = self.path.toStringWithoutAddress
|
||||
|
||||
Persist sequence of events
|
||||
--------------------------
|
||||
|
||||
The ``persist`` method that takes a ``Seq`` (Scala) or ``Iterable`` (Java) of events parameter was deprecated and
|
||||
renamed to ``persistAll`` to avoid mistakes of persisting other collection types as one single event by calling
|
||||
the overloaded ``persist(event)`` method.
|
||||
|
||||
Secure Cookies
|
||||
==============
|
||||
|
|
@ -315,3 +292,96 @@ actor external actor of how to allocate shards or rebalance shards.
|
|||
|
||||
For the synchronous case you can return the result via ``scala.concurrent.Future.successful`` in Scala or
|
||||
``akka.dispatch.Futures.successful`` in Java.
|
||||
|
||||
Akka Persistence
|
||||
================
|
||||
|
||||
Mendatory persistenceId
|
||||
-----------------------
|
||||
|
||||
It is now mandatory to define the ``persistenceId`` in subclasses of ``PersistentActor``, ``UntypedPersistentActor``
|
||||
and ``AbstractPersistentId``.
|
||||
|
||||
The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical
|
||||
"which persistent entity this actor represents".
|
||||
|
||||
In case you want to preserve the old behavior of providing the actor's path as the default ``persistenceId``, you can easily
|
||||
implement it yourself either as a helper trait or simply by overriding ``persistenceId`` as follows::
|
||||
|
||||
override def persistenceId = self.path.toStringWithoutAddress
|
||||
|
||||
|
||||
Persist sequence of events
|
||||
--------------------------
|
||||
|
||||
The ``persist`` method that takes a ``Seq`` (Scala) or ``Iterable`` (Java) of events parameter was deprecated and
|
||||
renamed to ``persistAll`` to avoid mistakes of persisting other collection types as one single event by calling
|
||||
the overloaded ``persist(event)`` method.
|
||||
|
||||
Persistence Plugin APIs
|
||||
=======================
|
||||
|
||||
SnapshotStore: Snapshots can now be deleted asynchronously (and report failures)
|
||||
--------------------------------------------------------------------------------
|
||||
Previously the ``SnapshotStore`` plugin SPI did not allow for asynchronous deletion of snapshots,
|
||||
and failures of deleting a snapshot may have been even silently ignored.
|
||||
|
||||
Now ``SnapshotStore`` must return a ``Future`` representing the deletion of the snapshot.
|
||||
If this future completes successfully the ``PersistentActor`` which initiated the snapshotting will
|
||||
be notified via an ``DeleteSnapshotSuccess`` message. If the deletion fails for some reason a ``DeleteSnapshotFailure``
|
||||
will be sent to the actor instead.
|
||||
|
||||
For ``criteria`` based deletion of snapshots (``def deleteSnapshots(criteria: SnapshotSelectionCriteria)``) equivalent
|
||||
``DeleteSnapshotsSuccess`` and ``DeleteSnapshotsFailure`` messages are sent, which contain the specified criteria,
|
||||
instead of ``SnapshotMetadata`` as is the case with the single snapshot deletion messages.
|
||||
|
||||
SnapshotStore: Removed 'saved' callback
|
||||
---------------------------------------
|
||||
Snapshot Stores previously were required to implement a ``def saved(meta: SnapshotMetadata): Unit`` method which
|
||||
would be called upon successful completion of a ``saveAsync`` (``doSaveAsync`` in Java API) snapshot write.
|
||||
|
||||
Currently all journals and snapshot stores perform asynchronous writes and deletes, thus all could potentially benefit
|
||||
from such callback methods. The only gain these callback give over composing an ``onComplete`` over ``Future`` returned
|
||||
by the journal or snapshot store is that it is executed in the Actors context, thus it can safely (without additional
|
||||
synchronization modify its internal state - for example a "pending writes" counter).
|
||||
|
||||
However, this feature was not used by many plugins, and expanding the API to accomodate all callbacks would have grown
|
||||
the API a lot. Instead, Akka Persistence 2.4.x introduces an additional (optionally overrideable)
|
||||
``receivePluginInternal:Actor.Receive`` method in the plugin API, which can be used for handling those as well as any custom messages
|
||||
that are sent to the plugin Actor (imagine use cases like "wake up and continue reading" or custom protocols which your
|
||||
specialised journal can implement).
|
||||
|
||||
Implementations using the previous feature should adjust their code as follows::
|
||||
|
||||
// previously
|
||||
class MySnapshots extends SnapshotStore {
|
||||
// old API:
|
||||
// def saved(meta: SnapshotMetadata): Unit = doThings()
|
||||
|
||||
// new API:
|
||||
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
|
||||
// completion or failure of the returned future triggers internal messages in receivePluginInternal
|
||||
val f: Future[Unit] = ???
|
||||
|
||||
// custom messages can be piped to self in order to be received in receivePluginInternal
|
||||
f.map(MyCustomMessage(_)) pipeTo self
|
||||
|
||||
f
|
||||
}
|
||||
|
||||
def receivePluginInternal = {
|
||||
case SaveSnapshotSuccess(metadata) => doThings()
|
||||
case MyCustomMessage(data) => doOtherThings()
|
||||
}
|
||||
|
||||
// ...
|
||||
}
|
||||
|
||||
SnapshotStore: Java 8 Optional used in Java plugin APIs
|
||||
-------------------------------------------------------
|
||||
In places where previously ``akka.japi.Option`` was used in Java APIs, including the return type of ``doLoadAsync``,
|
||||
the Java 8 provided ``Optional`` type is used now.
|
||||
|
||||
Please remember that when creating an ``java.util.Optional`` instance from a (possibly) ``null`` value you will want to
|
||||
use the non-throwing ``Optional.fromNullable`` method, which converts a ``null`` into a ``None`` value - which is
|
||||
slightly different than its Scala counterpart (where ``Option.apply(null)`` returns ``None``).
|
||||
|
|
@ -203,17 +203,3 @@ To continue using LevelDB based persistence plugins it is now required for relat
|
|||
to include an additional explicit dependency declaration for the LevelDB artifacts.
|
||||
This change allows production akka deployments to avoid need for the LevelDB provisioning.
|
||||
Please see persistence extension ``reference.conf`` for details.
|
||||
|
||||
SnapshotStore: Snapshots can now be deleted asynchronously (and report failures)
|
||||
================================================================================
|
||||
Previously the ``SnapshotStore`` plugin SPI did not allow for asynchronous deletion of snapshots,
|
||||
and failures of deleting a snapshot may have been even silently ignored.
|
||||
|
||||
Now ``SnapshotStore``s must return a ``Future`` representing the deletion of the snapshot.
|
||||
If this future completes successfully the ``PersistentActor`` which initiated the snapshotting will
|
||||
be notified via an ``DeleteSnapshotSuccess`` message. If the deletion fails for some reason a ``DeleteSnapshotFailure``
|
||||
will be sent to the actor instead.
|
||||
|
||||
For ``criteria`` based deletion of snapshots (``def deleteSnapshots(criteria: SnapshotSelectionCriteria)``) equivalent
|
||||
``DeleteSnapshotsSuccess`` and ``DeleteSnapshotsFailure`` messages are sent, which contain the specified criteria,
|
||||
instead of ``SnapshotMetadata`` as is the case with the single snapshot deletion messages.
|
||||
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package docs.persistence
|
||||
|
||||
import akka.actor.Actor.Receive
|
||||
import akka.actor.ActorSystem
|
||||
import akka.testkit.TestKit
|
||||
import com.typesafe.config._
|
||||
|
|
@ -132,15 +133,20 @@ class MyJournal extends AsyncWriteJournal {
|
|||
replayCallback: (PersistentRepr) => Unit): Future[Unit] = ???
|
||||
def asyncReadHighestSequenceNr(persistenceId: String,
|
||||
fromSequenceNr: Long): Future[Long] = ???
|
||||
|
||||
// optionally override:
|
||||
override def receivePluginInternal: Receive = super.receivePluginInternal
|
||||
}
|
||||
|
||||
class MySnapshotStore extends SnapshotStore {
|
||||
def loadAsync(persistenceId: String,
|
||||
criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ???
|
||||
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ???
|
||||
def saved(metadata: SnapshotMetadata): Unit = ???
|
||||
def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = ???
|
||||
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = ???
|
||||
|
||||
// optionally override:
|
||||
override def receivePluginInternal: Receive = super.receivePluginInternal
|
||||
}
|
||||
|
||||
object PersistenceTCKDoc {
|
||||
|
|
|
|||
|
|
@ -395,9 +395,11 @@ saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay a
|
|||
Snapshot deletion
|
||||
-----------------
|
||||
|
||||
A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number and the
|
||||
timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should
|
||||
use the ``deleteSnapshots`` method.
|
||||
A persistent actor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number of
|
||||
when the snapshot was taken.
|
||||
|
||||
To bulk-delete a range of snapshots matching ``SnapshotSelectionCriteria``,
|
||||
persistent actors should use the ``deleteSnapshots`` method.
|
||||
|
||||
.. _at-least-once-delivery:
|
||||
|
||||
|
|
|
|||
|
|
@ -4,10 +4,12 @@
|
|||
|
||||
package akka.persistence.snapshot.japi;
|
||||
|
||||
import akka.persistence.SelectedSnapshot;
|
||||
import akka.persistence.SnapshotMetadata;
|
||||
import akka.persistence.SnapshotSelectionCriteria;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
import akka.japi.Option;
|
||||
import akka.persistence.*;
|
||||
import java.util.Optional;
|
||||
|
||||
interface SnapshotStorePlugin {
|
||||
//#snapshot-store-plugin-api
|
||||
|
|
@ -19,7 +21,7 @@ interface SnapshotStorePlugin {
|
|||
* @param criteria
|
||||
* selection criteria for loading.
|
||||
*/
|
||||
Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria);
|
||||
Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria);
|
||||
|
||||
/**
|
||||
* Java API, Plugin API: asynchronously saves a snapshot.
|
||||
|
|
@ -31,21 +33,13 @@ interface SnapshotStorePlugin {
|
|||
*/
|
||||
Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot);
|
||||
|
||||
/**
|
||||
* Java API, Plugin API: called after successful saving of a snapshot.
|
||||
*
|
||||
* @param metadata
|
||||
* snapshot metadata.
|
||||
*/
|
||||
void onSaved(SnapshotMetadata metadata) throws Exception;
|
||||
|
||||
/**
|
||||
* Java API, Plugin API: deletes the snapshot identified by `metadata`.
|
||||
*
|
||||
* @param metadata
|
||||
* snapshot metadata.
|
||||
*/
|
||||
Future<Void> doDelete(SnapshotMetadata metadata) throws Exception;
|
||||
Future<Void> doDeleteAsync(SnapshotMetadata metadata);
|
||||
|
||||
/**
|
||||
* Java API, Plugin API: deletes all snapshots matching `criteria`.
|
||||
|
|
@ -55,6 +49,6 @@ interface SnapshotStorePlugin {
|
|||
* @param criteria
|
||||
* selection criteria for deleting.
|
||||
*/
|
||||
Future<Void> doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception;
|
||||
Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria);
|
||||
//#snapshot-store-plugin-api
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,7 +27,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
|||
private val resequencer = context.actorOf(Props[Resequencer]())
|
||||
private var resequencerCounter = 1L
|
||||
|
||||
def receive = {
|
||||
final def receive = receiveWriteJournal orElse receivePluginInternal
|
||||
|
||||
final val receiveWriteJournal: Actor.Receive = {
|
||||
case WriteMessages(messages, persistentActor, actorInstanceId) ⇒
|
||||
val cctr = resequencerCounter
|
||||
def resequence(f: PersistentRepr ⇒ Any) = messages.zipWithIndex.foreach {
|
||||
|
|
@ -43,6 +45,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
|||
resequence(WriteMessageFailure(_, e, actorInstanceId))
|
||||
}
|
||||
resequencerCounter += messages.length + 1
|
||||
|
||||
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted) ⇒
|
||||
// Send replayed messages and replay result to persistentActor directly. No need
|
||||
// to resequence replayed messages relative to written and looped messages.
|
||||
|
|
@ -58,6 +61,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
|||
} pipeTo persistentActor onSuccess {
|
||||
case _ if publish ⇒ context.system.eventStream.publish(r)
|
||||
}
|
||||
|
||||
case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor) ⇒
|
||||
// Send read highest sequence number to persistentActor directly. No need
|
||||
// to resequence the result relative to written and looped messages.
|
||||
|
|
@ -66,6 +70,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
|||
} recover {
|
||||
case e ⇒ ReadHighestSequenceNrFailure(e)
|
||||
} pipeTo persistentActor
|
||||
|
||||
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒
|
||||
asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete {
|
||||
case Success(_) ⇒ if (publish) context.system.eventStream.publish(d)
|
||||
|
|
@ -87,6 +92,14 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
|||
* as deleted, otherwise they are permanently deleted.
|
||||
*/
|
||||
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit]
|
||||
|
||||
/**
|
||||
* Plugin API
|
||||
*
|
||||
* Allows plugin implementers to use `f pipeTo self` and
|
||||
* handle additional messages for implementing advanced features
|
||||
*/
|
||||
def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
|
||||
//#journal-plugin-api
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,17 +4,17 @@
|
|||
|
||||
package akka.persistence.journal
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent._
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.language.postfixOps
|
||||
|
||||
import akka.AkkaException
|
||||
import akka.actor._
|
||||
import akka.pattern.ask
|
||||
import akka.persistence._
|
||||
import akka.util._
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent._
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.language.postfixOps
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
|
|
@ -24,17 +24,18 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash
|
|||
import AsyncWriteProxy._
|
||||
import AsyncWriteTarget._
|
||||
|
||||
private val initialized = super.receive
|
||||
private var isInitialized = false
|
||||
private var store: ActorRef = _
|
||||
|
||||
override def receive = {
|
||||
case SetStore(ref) ⇒
|
||||
store = ref
|
||||
unstashAll()
|
||||
context.become(initialized)
|
||||
case x ⇒
|
||||
stash()
|
||||
}
|
||||
override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit =
|
||||
if (isInitialized) super.aroundReceive(receive, msg)
|
||||
else msg match {
|
||||
case SetStore(ref) ⇒
|
||||
store = ref
|
||||
unstashAll()
|
||||
isInitialized = true
|
||||
case _ ⇒ stash()
|
||||
}
|
||||
|
||||
implicit def timeout: Timeout
|
||||
|
||||
|
|
|
|||
|
|
@ -21,54 +21,67 @@ trait SnapshotStore extends Actor with ActorLogging {
|
|||
private val extension = Persistence(context.system)
|
||||
private val publish = extension.settings.internal.publishPluginCommands
|
||||
|
||||
final def receive = {
|
||||
final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal)
|
||||
|
||||
final val receiveSnapshotStore: Actor.Receive = {
|
||||
case LoadSnapshot(persistenceId, criteria, toSequenceNr) ⇒
|
||||
val p = sender()
|
||||
loadAsync(persistenceId, criteria.limit(toSequenceNr)) map {
|
||||
sso ⇒ LoadSnapshotResult(sso, toSequenceNr)
|
||||
} recover {
|
||||
case e ⇒ LoadSnapshotResult(None, toSequenceNr)
|
||||
} pipeTo p
|
||||
} pipeTo senderPersistentActor()
|
||||
|
||||
case SaveSnapshot(metadata, snapshot) ⇒
|
||||
val p = sender()
|
||||
val md = metadata.copy(timestamp = System.currentTimeMillis)
|
||||
saveAsync(md, snapshot) map {
|
||||
_ ⇒ SaveSnapshotSuccess(md)
|
||||
} recover {
|
||||
case e ⇒ SaveSnapshotFailure(metadata, e)
|
||||
} to (self, p)
|
||||
|
||||
case evt @ SaveSnapshotSuccess(metadata) ⇒
|
||||
try saved(metadata) finally sender() ! evt // sender is persistentActor
|
||||
} to (self, senderPersistentActor())
|
||||
|
||||
case evt: SaveSnapshotSuccess ⇒
|
||||
try tryReceivePluginInternal(evt) finally senderPersistentActor ! evt // sender is persistentActor
|
||||
case evt @ SaveSnapshotFailure(metadata, _) ⇒
|
||||
try deleteAsync(metadata) finally sender() ! evt // sender is persistentActor
|
||||
try {
|
||||
tryReceivePluginInternal(evt)
|
||||
deleteAsync(metadata)
|
||||
} finally senderPersistentActor() ! evt // sender is persistentActor
|
||||
|
||||
case d @ DeleteSnapshot(metadata) ⇒
|
||||
val p = sender()
|
||||
deleteAsync(metadata) map {
|
||||
case _ ⇒
|
||||
log.warning("deleting by: " + d)
|
||||
DeleteSnapshotSuccess(metadata)
|
||||
case _ ⇒ DeleteSnapshotSuccess(metadata)
|
||||
} recover {
|
||||
case e ⇒ DeleteSnapshotFailure(metadata, e)
|
||||
} pipeTo p onComplete {
|
||||
case _ if publish ⇒ context.system.eventStream.publish(d)
|
||||
}
|
||||
} to (self, senderPersistentActor())
|
||||
if (publish) context.system.eventStream.publish(d)
|
||||
|
||||
case evt: DeleteSnapshotSuccess ⇒
|
||||
try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt
|
||||
case evt: DeleteSnapshotFailure ⇒
|
||||
try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt
|
||||
|
||||
case d @ DeleteSnapshots(persistenceId, criteria) ⇒
|
||||
val p = sender()
|
||||
deleteAsync(persistenceId, criteria) map {
|
||||
case _ ⇒ DeleteSnapshotsSuccess(criteria)
|
||||
} recover {
|
||||
case e ⇒ DeleteSnapshotsFailure(criteria, e)
|
||||
} pipeTo p onComplete {
|
||||
case _ if publish ⇒ context.system.eventStream.publish(d)
|
||||
}
|
||||
} to (self, senderPersistentActor())
|
||||
if (publish) context.system.eventStream.publish(d)
|
||||
|
||||
case evt: DeleteSnapshotsFailure ⇒
|
||||
try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt // sender is persistentActor
|
||||
case evt: DeleteSnapshotsSuccess ⇒
|
||||
try tryReceivePluginInternal(evt) finally senderPersistentActor() ! evt
|
||||
}
|
||||
|
||||
/** Documents intent that the sender() is expected to be the PersistentActor */
|
||||
@inline private final def senderPersistentActor(): ActorRef = sender()
|
||||
|
||||
private def tryReceivePluginInternal(evt: Any): Unit =
|
||||
if (receivePluginInternal.isDefinedAt(evt)) receivePluginInternal(evt)
|
||||
|
||||
//#snapshot-store-plugin-api
|
||||
|
||||
/**
|
||||
* Plugin API: asynchronously loads a snapshot.
|
||||
*
|
||||
|
|
@ -85,13 +98,6 @@ trait SnapshotStore extends Actor with ActorLogging {
|
|||
*/
|
||||
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
|
||||
|
||||
/**
|
||||
* Plugin API: called after successful saving of a snapshot.
|
||||
*
|
||||
* @param metadata snapshot metadata.
|
||||
*/
|
||||
def saved(metadata: SnapshotMetadata): Unit
|
||||
|
||||
/**
|
||||
* Plugin API: deletes the snapshot identified by `metadata`.
|
||||
*
|
||||
|
|
@ -107,5 +113,12 @@ trait SnapshotStore extends Actor with ActorLogging {
|
|||
* @param criteria selection criteria for deleting.
|
||||
*/
|
||||
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit]
|
||||
|
||||
/**
|
||||
* Plugin API
|
||||
* Allows plugin implementers to use `f pipeTo self` and
|
||||
* handle additional messages for implementing advanced features
|
||||
*/
|
||||
def receivePluginInternal: Actor.Receive = Actor.emptyBehavior
|
||||
//#snapshot-store-plugin-api
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,9 +4,9 @@
|
|||
|
||||
package akka.persistence.snapshot.japi
|
||||
|
||||
import akka.japi.{ Option ⇒ JOption }
|
||||
import akka.persistence._
|
||||
import akka.persistence.snapshot.{ SnapshotStore ⇒ SSnapshotStore }
|
||||
import akka.japi.Util._
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
|
|
@ -16,19 +16,16 @@ import scala.concurrent.Future
|
|||
abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin {
|
||||
import context.dispatcher
|
||||
|
||||
final def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria) =
|
||||
doLoadAsync(persistenceId, criteria).map(_.asScala)
|
||||
override final def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
|
||||
doLoadAsync(persistenceId, criteria).map(option)
|
||||
|
||||
final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
|
||||
override final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
|
||||
doSaveAsync(metadata, snapshot).map(Unit.unbox)
|
||||
|
||||
final def saved(metadata: SnapshotMetadata): Unit =
|
||||
onSaved(metadata)
|
||||
override final def deleteAsync(metadata: SnapshotMetadata): Future[Unit] =
|
||||
doDeleteAsync(metadata).map(_ ⇒ ())
|
||||
|
||||
final def delete(metadata: SnapshotMetadata): Future[Unit] =
|
||||
doDelete(metadata).map(_ ⇒ ())
|
||||
|
||||
final def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
|
||||
doDelete(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ ⇒ ())
|
||||
override final def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
|
||||
doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ ⇒ ())
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import scala.concurrent.Future
|
|||
import scala.util._
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
* INTERNAL API
|
||||
*
|
||||
* Local filesystem backed snapshot store.
|
||||
*/
|
||||
|
|
@ -38,7 +38,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
|
|||
private val serializationExtension = SerializationExtension(context.system)
|
||||
private var saving = immutable.Set.empty[SnapshotMetadata] // saving in progress
|
||||
|
||||
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
|
||||
override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
|
||||
//
|
||||
// Heuristics:
|
||||
//
|
||||
|
|
@ -50,16 +50,13 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
|
|||
Future(load(metadata))(streamDispatcher)
|
||||
}
|
||||
|
||||
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
|
||||
override def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
|
||||
saving += metadata
|
||||
Future(save(metadata, snapshot))(streamDispatcher)
|
||||
val completion = Future(save(metadata, snapshot))(streamDispatcher)
|
||||
completion
|
||||
}
|
||||
|
||||
def saved(metadata: SnapshotMetadata): Unit = {
|
||||
saving -= metadata
|
||||
}
|
||||
|
||||
def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = {
|
||||
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = {
|
||||
saving -= metadata
|
||||
Future {
|
||||
// multiple snapshot files here mean that there were multiple snapshots for this seqNr, we delete all of them
|
||||
|
|
@ -69,13 +66,20 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
|
|||
}(streamDispatcher).map(_ ⇒ ())(streamDispatcher)
|
||||
}
|
||||
|
||||
def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
|
||||
override def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = {
|
||||
val metadatas = snapshotMetadatas(persistenceId, criteria)
|
||||
Future.sequence {
|
||||
metadatas.map(deleteAsync)
|
||||
}(collection.breakOut, streamDispatcher).map(_ ⇒ ())(streamDispatcher)
|
||||
}
|
||||
|
||||
override def receivePluginInternal: Receive = {
|
||||
case SaveSnapshotSuccess(metadata) ⇒ saving -= metadata
|
||||
case _: SaveSnapshotFailure ⇒ // ignore
|
||||
case _: DeleteSnapshotsSuccess ⇒ // ignore
|
||||
case _: DeleteSnapshotsFailure ⇒ // ignore
|
||||
}
|
||||
|
||||
private def snapshotFiles(metadata: SnapshotMetadata): immutable.Seq[File] = {
|
||||
// pick all files for this persistenceId and sequenceNr, old journals could have created multiple entries with appended timestamps
|
||||
snapshotDir.listFiles(new SnapshotSeqNrFilenameFilter(metadata.persistenceId, metadata.sequenceNr)).toVector
|
||||
|
|
|
|||
|
|
@ -178,7 +178,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
|
|||
persistentActor1 ! Recover(toSequenceNr = 4)
|
||||
persistentActor1 ! "done"
|
||||
|
||||
val metadata = expectMsgPF(hint = "" + SnapshotOffer(SnapshotMetadata(persistenceId, 4, 0), null)) {
|
||||
val metadata = expectMsgPF() {
|
||||
case SnapshotOffer(md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒
|
||||
state should ===(List("a-1", "b-2", "c-3", "d-4").reverse)
|
||||
md
|
||||
|
|
@ -188,10 +188,7 @@ class SnapshotSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "Sn
|
|||
|
||||
persistentActor1 ! Delete1(metadata)
|
||||
deleteProbe.expectMsgType[DeleteSnapshot]
|
||||
expectMsgPF(hint = "" + DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, 0))) {
|
||||
case m @ DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, _)) ⇒
|
||||
info("success = " + m)
|
||||
}
|
||||
expectMsgPF() { case m @ DeleteSnapshotSuccess(SnapshotMetadata(`persistenceId`, 4, _)) ⇒ }
|
||||
|
||||
// recover persistentActor from 2nd snapshot (3rd was deleted) plus replayed messages
|
||||
val persistentActor2 = system.actorOf(Props(classOf[DeleteSnapshotTestPersistentActor], name, testActor))
|
||||
|
|
|
|||
|
|
@ -16,8 +16,8 @@ import akka.persistence.journal.leveldb.SharedLeveldbJournal;
|
|||
import akka.persistence.journal.leveldb.SharedLeveldbStore;
|
||||
import akka.japi.pf.ReceiveBuilder;
|
||||
import scala.concurrent.Future;
|
||||
import akka.japi.Option;
|
||||
import akka.japi.Procedure;
|
||||
import java.util.Optional;
|
||||
|
||||
|
||||
public class LambdaPersistencePluginDocTest {
|
||||
|
|
@ -56,7 +56,7 @@ public class LambdaPersistencePluginDocTest {
|
|||
|
||||
class MySnapshotStore extends SnapshotStore {
|
||||
@Override
|
||||
public Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
|
||||
public Future<Optional<SelectedSnapshot>> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -66,16 +66,12 @@ public class LambdaPersistencePluginDocTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onSaved(SnapshotMetadata metadata) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> doDelete(SnapshotMetadata metadata) throws Exception {
|
||||
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
|
||||
return Futures.successful(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<Void> doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception {
|
||||
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
|
||||
return Futures.successful(null);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue