!per #3717 Deletion API for snapshots and persistent messages

- single and bulk deletion of messages
- single and bulk deletion of snapshots
- run journal and snapshot store as system actors
- rename physical parameter in delete methods to permanent
- StashSupport.prepend docs and implementation enhancements
This commit is contained in:
Martin Krasser 2013-11-12 09:02:02 +01:00
parent e104e2a92c
commit 4489a72bea
32 changed files with 352 additions and 118 deletions

View file

@ -164,10 +164,11 @@ private[akka] trait StashSupport {
} }
/** /**
* Prepends `others` to this stash. * Prepends `others` to this stash. This method is optimized for a large stash and
* small `others`.
*/ */
private[akka] def prepend(others: immutable.Seq[Envelope]): Unit = private[akka] def prepend(others: immutable.Seq[Envelope]): Unit =
others.reverseIterator.foreach(env theStash = env +: theStash) theStash = others.foldRight(theStash)((e, s) e +: s)
/** /**
* Prepends the oldest message in the stash to the mailbox, and then removes that * Prepends the oldest message in the stash to the mailbox, and then removes that

View file

@ -101,7 +101,7 @@ public class PersistenceDocTest {
@Override @Override
public void preRestart(Throwable reason, Option<Object> message) { public void preRestart(Throwable reason, Option<Object> message) {
if (message.isDefined() && message.get() instanceof Persistent) { if (message.isDefined() && message.get() instanceof Persistent) {
deleteMessage((Persistent) message.get()); deleteMessage(((Persistent) message.get()).sequenceNr());
} }
super.preRestart(reason, message); super.preRestart(reason, message);
} }

View file

@ -32,6 +32,10 @@ public class PersistencePluginDocTest {
@Override @Override
public void doDelete(SnapshotMetadata metadata) throws Exception { public void doDelete(SnapshotMetadata metadata) throws Exception {
} }
@Override
public void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception {
}
} }
class MyAsyncJournal extends AsyncWriteJournal { class MyAsyncJournal extends AsyncWriteJournal {
@ -51,7 +55,7 @@ public class PersistencePluginDocTest {
} }
@Override @Override
public Future<Void> doDeleteAsync(String processorId, long sequenceNr, boolean physical) { public Future<Void> doDeleteAsync(String processorId, long fromSequenceNr, long toSequenceNr, boolean permanent) {
return null; return null;
} }

View file

@ -143,11 +143,12 @@ a replay of that message during recovery it can be deleted.
Message deletion Message deletion
---------------- ----------------
A processor can delete messages by calling the ``delete`` method with a ``Persistent`` message object or a A processor can delete a single message by calling the ``deleteMessage`` method with the sequence number of
sequence number as argument. An optional ``physical`` parameter specifies whether the message shall be that message as argument. An optional ``permanent`` parameter specifies whether the message shall be permanently
physically deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions
Later extensions to Akka persistence will allow to replay messages that have been marked as deleted which can to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging
be useful for debugging purposes, for example. purposes, for example. To delete all messages (journaled by a single processor) up to a specified sequence number,
processors can call the ``deleteMessages`` method.
Identifiers Identifiers
----------- -----------
@ -315,6 +316,13 @@ If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which s
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
Snapshot deletion
-----------------
A processor can delete a single snapshot by calling the ``deleteSnapshot`` method with the sequence number and the
timestamp of the snapshot as argument. To bulk-delete snapshots that match a specified ``SnapshotSelectionCriteria``
argument, processors can call the ``deleteSnapshots`` method.
.. _event-sourcing-java: .. _event-sourcing-java:
Event sourcing Event sourcing

View file

@ -67,7 +67,7 @@ trait PersistenceDocSpec {
//#deletion //#deletion
override def preRestart(reason: Throwable, message: Option[Any]) { override def preRestart(reason: Throwable, message: Option[Any]) {
message match { message match {
case Some(p: Persistent) deleteMessage(p) case Some(p: Persistent) deleteMessage(p.sequenceNr)
case _ case _
} }
super.preRestart(reason, message) super.preRestart(reason, message)

View file

@ -71,7 +71,7 @@ class PersistencePluginDocSpec extends WordSpec {
class MyJournal extends AsyncWriteJournal { class MyJournal extends AsyncWriteJournal {
def writeAsync(persistent: PersistentRepr): Future[Unit] = ??? def writeAsync(persistent: PersistentRepr): Future[Unit] = ???
def writeBatchAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ??? def writeBatchAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ???
def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit] = ??? def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ???
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ??? def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ???
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) Unit): Future[Long] = ??? def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) Unit): Future[Long] = ???
} }
@ -79,6 +79,7 @@ class MyJournal extends AsyncWriteJournal {
class MySnapshotStore extends SnapshotStore { class MySnapshotStore extends SnapshotStore {
def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ??? def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ???
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ??? def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ???
def saved(metadata: SnapshotMetadata) {} def saved(metadata: SnapshotMetadata): Unit = ???
def delete(metadata: SnapshotMetadata) {} def delete(metadata: SnapshotMetadata): Unit = ???
def delete(processorId: String, criteria: SnapshotSelectionCriteria): Unit = ???
} }

View file

@ -138,11 +138,12 @@ a replay of that message during recovery it can be deleted.
Message deletion Message deletion
---------------- ----------------
A processor can delete messages by calling the ``delete`` method with a ``Persistent`` message object or a A processor can delete a single message by calling the ``deleteMessage`` method with the sequence number of
sequence number as argument. An optional ``physical`` parameter specifies whether the message shall be that message as argument. An optional ``permanent`` parameter specifies whether the message shall be permanently
physically deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions
Later extensions to Akka persistence will allow to replay messages that have been marked as deleted which can to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging
be useful for debugging purposes, for example. purposes, for example. To delete all messages (journaled by a single processor) up to a specified sequence number,
processors can call the ``deleteMessages`` method.
Identifiers Identifiers
----------- -----------
@ -326,6 +327,13 @@ If not specified, they default to ``SnapshotSelectionCriteria.Latest`` which sel
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.None``. A recovery where no To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.None``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
Snapshot deletion
-----------------
A processor can delete a single snapshot by calling the ``deleteSnapshot`` method with the sequence number and the
timestamp of the snapshot as argument. To bulk-delete snapshots that match a specified ``SnapshotSelectionCriteria``
argument, processors can call the ``deleteSnapshots`` method.
.. _event-sourcing: .. _event-sourcing:
Event sourcing Event sourcing

View file

@ -28,7 +28,7 @@ interface AsyncReplayPlugin {
* message must be contained in that message's `confirms` sequence. * message must be contained in that message's `confirms` sequence.
* *
* @param processorId processor id. * @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start. * @param fromSequenceNr sequence number where replay should start (inclusive).
* @param toSequenceNr sequence number where replay should end (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message. Can be called from any * @param replayCallback called to replay a single message. Can be called from any
* thread. * thread.

View file

@ -23,11 +23,14 @@ interface AsyncWritePlugin {
Future<Void> doWriteBatchAsync(Iterable<PersistentRepr> persistentBatch); Future<Void> doWriteBatchAsync(Iterable<PersistentRepr> persistentBatch);
/** /**
* Java API, Plugin API: asynchronously deletes a persistent message. If `physical` * Java API, Plugin API: asynchronously deletes all persistent messages within the
* is set to `false`, the persistent message is marked as deleted, otherwise it is * range from `fromSequenceNr` to `toSequenceNr`. If `permanent` is set to `false`,
* physically deleted. * the persistent messages are marked as deleted, otherwise they are permanently
* deleted.
*
* @see AsyncReplayPlugin
*/ */
Future<Void> doDeleteAsync(String processorId, long sequenceNr, boolean physical); Future<Void> doDeleteAsync(String processorId, long fromSequenceNr, long toSequenceNr, boolean permanent);
/** /**
* Java API, Plugin API: asynchronously writes a delivery confirmation to the * Java API, Plugin API: asynchronously writes a delivery confirmation to the

View file

@ -21,11 +21,14 @@ interface SyncWritePlugin {
void doWriteBatch(Iterable<PersistentRepr> persistentBatch); void doWriteBatch(Iterable<PersistentRepr> persistentBatch);
/** /**
* Java API, Plugin API: synchronously deletes a persistent message. If `physical` * Java API, Plugin API: synchronously deletes all persistent messages within the
* is set to `false`, the persistent message is marked as deleted, otherwise it is * range from `fromSequenceNr` to `toSequenceNr`. If `permanent` is set to `false`,
* physically deleted. * the persistent messages are marked as deleted, otherwise they are permanently
* deleted.
*
* @see AsyncReplayPlugin
*/ */
void doDelete(String processorId, long sequenceNr, boolean physical); void doDelete(String processorId, long fromSequenceNr, long toSequenceNr, boolean permanent);
/** /**
* Java API, Plugin API: synchronously writes a delivery confirmation to the journal. * Java API, Plugin API: synchronously writes a delivery confirmation to the journal.

View file

@ -40,5 +40,13 @@ interface SnapshotStorePlugin {
* @param metadata snapshot metadata. * @param metadata snapshot metadata.
*/ */
void doDelete(SnapshotMetadata metadata) throws Exception; void doDelete(SnapshotMetadata metadata) throws Exception;
/**
* Java API, Plugin API: deletes all snapshots matching `criteria`.
*
* @param processorId processor id.
* @param criteria selection criteria for deleting.
*/
void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception;
//#snapshot-store-plugin-api //#snapshot-store-plugin-api
} }

View file

@ -64,11 +64,11 @@ private[persistence] trait Eventsourced extends Processor {
private val persistingEvents: State = new State { private val persistingEvents: State = new State {
def aroundReceive(receive: Receive, message: Any) = message match { def aroundReceive(receive: Receive, message: Any) = message match {
case PersistentBatch(b) { case PersistentBatch(b) {
b.foreach(p deleteMessage(p, true)) b.foreach(p deleteMessage(p.sequenceNr, true))
throw new UnsupportedOperationException("Persistent command batches not supported") throw new UnsupportedOperationException("Persistent command batches not supported")
} }
case p: PersistentRepr { case p: PersistentRepr {
deleteMessage(p, true) deleteMessage(p.sequenceNr, true)
throw new UnsupportedOperationException("Persistent commands not supported") throw new UnsupportedOperationException("Persistent commands not supported")
} }
case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) { case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) {

View file

@ -13,12 +13,12 @@ import akka.actor._
*/ */
private[persistence] object JournalProtocol { private[persistence] object JournalProtocol {
/** /**
* Instructs a journal to delete a persistent message identified by `processorId` * Instructs a journal to delete all persistent messages with sequence numbers in
* and `sequenceNr`. If `physical` is set to `false`, the persistent message is * the range from `fromSequenceNr` to `toSequenceNr` (both inclusive). If `permanent`
* marked as deleted in the journal, otherwise it is physically deleted from the * is set to `false`, the persistent messages are marked as deleted in the journal,
* journal. * otherwise they are permanently deleted from the journal.
*/ */
case class Delete(processorId: String, sequenceNr: Long, physical: Boolean) case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean)
/** /**
* Instructs a journal to persist a sequence of messages. * Instructs a journal to persist a sequence of messages.

View file

@ -4,8 +4,6 @@
package akka.persistence package akka.persistence
import com.typesafe.config.Config
import akka.actor._ import akka.actor._
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.persistence.journal.AsyncWriteJournal import akka.persistence.journal.AsyncWriteJournal
@ -35,6 +33,13 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
private val journal = createPlugin("journal", clazz private val journal = createPlugin("journal", clazz
if (classOf[AsyncWriteJournal].isAssignableFrom(clazz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId) if (classOf[AsyncWriteJournal].isAssignableFrom(clazz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId)
private[persistence] val publishPluginCommands: Boolean = {
val path = "publish-plugin-commands"
// this config option is only used internally (for testing
// purposes) and is therefore not defined in reference.conf
config.hasPath(path) && config.getBoolean(path)
}
/** /**
* Returns a snapshot store for a processor identified by `processorId`. * Returns a snapshot store for a processor identified by `processorId`.
*/ */
@ -69,7 +74,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
val pluginClassName = pluginConfig.getString("class") val pluginClassName = pluginConfig.getString("class")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass) val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass)
system.actorOf(Props(pluginClass).withDispatcher(pluginDispatcherId)) system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(pluginClass).withDispatcher(pluginDispatcherId), pluginType)
} }
private def id(ref: ActorRef) = ref.path.toStringWithAddress(system.provider.getDefaultAddress) private def id(ref: ActorRef) = ref.path.toStringWithAddress(system.provider.getDefaultAddress)

View file

@ -235,54 +235,57 @@ trait Processor extends Actor with Stash with StashFactory {
implicit def currentPersistentMessage: Option[Persistent] = Option(_currentPersistent) implicit def currentPersistentMessage: Option[Persistent] = Option(_currentPersistent)
/** /**
* Marks the `persistent` message as deleted. A message marked as deleted is not replayed during * Marks a persistent message, identified by `sequenceNr`, as deleted. A message marked as deleted is
* recovery. This method is usually called inside `preRestartProcessor` when a persistent message * not replayed during recovery. This method is usually called inside `preRestartProcessor` when a
* caused an exception. Processors that want to re-receive that persistent message during recovery * persistent message caused an exception. Processors that want to re-receive that persistent message
* should not call this method. * during recovery should not call this method.
* *
* @param persistent persistent message to be marked as deleted. * @param sequenceNr sequence number of the persistent message to be deleted.
* @throws IllegalArgumentException if `persistent` message has not been persisted by this
* processor.
*/ */
def deleteMessage(persistent: Persistent): Unit = { def deleteMessage(sequenceNr: Long): Unit = {
deleteMessage(persistent, false) deleteMessage(sequenceNr, false)
} }
/** /**
* Deletes a `persistent` message. If `physical` is set to `false` (default), the persistent * Deletes a persistent message identified by `sequenceNr`. If `permanent` is set to `false`,
* message is marked as deleted in the journal, otherwise it is physically deleted from the * the persistent message is marked as deleted in the journal, otherwise it is permanently
* journal. A deleted message is not replayed during recovery. This method is usually called
* inside `preRestartProcessor` when a persistent message caused an exception. Processors that
* want to re-receive that persistent message during recovery should not call this method.
*
* @param persistent persistent message to be deleted.
* @param physical if `false` (default), the message is marked as deleted, otherwise it is
* physically deleted.
* @throws IllegalArgumentException if `persistent` message has not been persisted by this
* processor.
*/
def deleteMessage(persistent: Persistent, physical: Boolean): Unit = {
val impl = persistent.asInstanceOf[PersistentRepr]
if (impl.processorId != processorId)
throw new IllegalArgumentException(
s"persistent message to be deleted (processor id = [${impl.processorId}], sequence number = [${impl.sequenceNr}]) " +
s"has not been persisted by this processor (processor id = [${processorId}])")
else deleteMessage(impl.sequenceNr, physical)
}
/**
* Deletes a persistent message identified by `sequenceNr`. If `physical` is set to `false`,
* the persistent message is marked as deleted in the journal, otherwise it is physically
* deleted from the journal. A deleted message is not replayed during recovery. This method * deleted from the journal. A deleted message is not replayed during recovery. This method
* is usually called inside `preRestartProcessor` when a persistent message caused an exception. * is usually called inside `preRestartProcessor` when a persistent message caused an exception.
* Processors that want to re-receive that persistent message during recovery should not call * Processors that want to re-receive that persistent message during recovery should not call
* this method. * this method.
* *
* Later extensions may also allow a replay of messages that have been marked as deleted which can
* be useful in debugging environments.
*
* @param sequenceNr sequence number of the persistent message to be deleted. * @param sequenceNr sequence number of the persistent message to be deleted.
* @param physical if `false`, the message is marked as deleted, otherwise it is physically deleted. * @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted.
*/ */
def deleteMessage(sequenceNr: Long, physical: Boolean): Unit = { def deleteMessage(sequenceNr: Long, permanent: Boolean): Unit = {
journal ! Delete(processorId, sequenceNr, physical) journal ! Delete(processorId, sequenceNr, sequenceNr, permanent)
}
/**
* Marks all persistent messages with sequence numbers less than or equal `toSequenceNr` as deleted.
*
* @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
*/
def deleteMessages(toSequenceNr: Long): Unit = {
deleteMessages(toSequenceNr, false)
}
/**
* Deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. If `permanent`
* is set to `false`, the persistent messages are marked as deleted in the journal, otherwise
* they permanently deleted from the journal.
*
* Later extensions may also allow a replay of messages that have been marked as deleted which can
* be useful in debugging environments.
*
* @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
* @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted.
*/
def deleteMessages(toSequenceNr: Long, permanent: Boolean): Unit = {
journal ! Delete(processorId, 1L, toSequenceNr, permanent)
} }
/** /**
@ -293,6 +296,20 @@ trait Processor extends Actor with Stash with StashFactory {
snapshotStore ! SaveSnapshot(SnapshotMetadata(processorId, lastSequenceNr), snapshot) snapshotStore ! SaveSnapshot(SnapshotMetadata(processorId, lastSequenceNr), snapshot)
} }
/**
* Deletes a snapshot identified by `sequenceNr` and `timestamp`.
*/
def deleteSnapshot(sequenceNr: Long, timestamp: Long): Unit = {
snapshotStore ! DeleteSnapshot(SnapshotMetadata(processorId, sequenceNr, timestamp))
}
/**
* Deletes all snapshots matching `criteria`.
*/
def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = {
snapshotStore ! DeleteSnapshots(processorId, criteria)
}
/** /**
* INTERNAL API. * INTERNAL API.
*/ */

View file

@ -41,7 +41,7 @@ case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any) case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
/** /**
* Selection criteria for loading snapshots. * Selection criteria for loading and deleting snapshots.
* *
* @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound. * @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound.
* @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound. * @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound.
@ -52,6 +52,9 @@ case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
case class SnapshotSelectionCriteria(maxSequenceNr: Long = Long.MaxValue, maxTimestamp: Long = Long.MaxValue) { case class SnapshotSelectionCriteria(maxSequenceNr: Long = Long.MaxValue, maxTimestamp: Long = Long.MaxValue) {
private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria = private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria =
if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this
private[persistence] def matches(metadata: SnapshotMetadata): Boolean =
metadata.sequenceNr <= maxSequenceNr && metadata.timestamp <= maxTimestamp
} }
object SnapshotSelectionCriteria { object SnapshotSelectionCriteria {
@ -125,4 +128,19 @@ private[persistence] object SnapshotProtocol {
* @param snapshot snapshot. * @param snapshot snapshot.
*/ */
case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any) case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any)
/**
* Instructs snapshot store to delete a snapshot.
*
* @param metadata snapshot metadata.
*/
case class DeleteSnapshot(metadata: SnapshotMetadata)
/**
* Instructs snapshot store to delete all snapshots that match `criteria`.
*
* @param processorId processor id.
* @param criteria criteria for selecting snapshots to be deleted.
*/
case class DeleteSnapshots(processorId: String, criteria: SnapshotSelectionCriteria)
} }

View file

@ -29,7 +29,7 @@ trait AsyncReplay {
* message must be contained in that message's `confirms` sequence. * message must be contained in that message's `confirms` sequence.
* *
* @param processorId processor id. * @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start. * @param fromSequenceNr sequence number where replay should start (inclusive).
* @param toSequenceNr sequence number where replay should end (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message. Can be called from any * @param replayCallback called to replay a single message. Can be called from any
* thread. * thread.

View file

@ -21,6 +21,8 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
import AsyncWriteJournal._ import AsyncWriteJournal._
import context.dispatcher import context.dispatcher
private val extension = Persistence(context.system)
private val resequencer = context.actorOf(Props[Resequencer]) private val resequencer = context.actorOf(Props[Resequencer])
private var resequencerCounter = 1L private var resequencerCounter = 1L
@ -62,14 +64,14 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
} }
case c @ Confirm(processorId, sequenceNr, channelId) { case c @ Confirm(processorId, sequenceNr, channelId) {
confirmAsync(processorId, sequenceNr, channelId) onComplete { confirmAsync(processorId, sequenceNr, channelId) onComplete {
case Success(_) context.system.eventStream.publish(c) case Success(_) if (extension.publishPluginCommands) context.system.eventStream.publish(c)
case Failure(e) // TODO: publish failure to event stream case Failure(e) // TODO: publish failure to event stream
} }
context.system.eventStream.publish(c) context.system.eventStream.publish(c)
} }
case d @ Delete(processorId, sequenceNr, physical) { case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent) {
deleteAsync(processorId, sequenceNr, physical) onComplete { deleteAsync(processorId, fromSequenceNr, toSequenceNr, permanent) onComplete {
case Success(_) context.system.eventStream.publish(d) case Success(_) if (extension.publishPluginCommands) context.system.eventStream.publish(d)
case Failure(e) // TODO: publish failure to event stream case Failure(e) // TODO: publish failure to event stream
} }
} }
@ -93,11 +95,14 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit]
/** /**
* Plugin API: asynchronously deletes a persistent message. If `physical` is set to * Plugin API: asynchronously deletes all persistent messages within the range from
* `false`, the persistent message is marked as deleted, otherwise it is physically * `fromSequenceNr` to `toSequenceNr` (both inclusive). If `permanent` is set to
* deleted. * `false`, the persistent messages are marked as deleted, otherwise they are
* permanently deleted.
*
* @see [[AsyncReplay]]
*/ */
def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit] def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit]
/** /**
* Plugin API: asynchronously writes a delivery confirmation to the journal. * Plugin API: asynchronously writes a delivery confirmation to the journal.

View file

@ -47,11 +47,11 @@ trait SyncWriteJournal extends Actor with AsyncReplay {
} }
case c @ Confirm(processorId, sequenceNr, channelId) { case c @ Confirm(processorId, sequenceNr, channelId) {
confirm(processorId, sequenceNr, channelId) confirm(processorId, sequenceNr, channelId)
context.system.eventStream.publish(c) // TODO: turn off by default and allow to turn on by configuration if (extension.publishPluginCommands) context.system.eventStream.publish(c)
} }
case d @ Delete(processorId, sequenceNr, physical) { case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent) {
delete(processorId, sequenceNr, physical) delete(processorId, fromSequenceNr, toSequenceNr, permanent)
context.system.eventStream.publish(d) // TODO: turn off by default and allow to turn on by configuration if (extension.publishPluginCommands) context.system.eventStream.publish(d)
} }
case Loop(message, processor) { case Loop(message, processor) {
processor forward LoopSuccess(message) processor forward LoopSuccess(message)
@ -72,11 +72,14 @@ trait SyncWriteJournal extends Actor with AsyncReplay {
def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]): Unit def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]): Unit
/** /**
* Plugin API: synchronously deletes a persistent message. If `physical` is set to * Plugin API: synchronously deletes all persistent messages within the range from
* `false`, the persistent message is marked as deleted, otherwise it is physically * `fromSequenceNr` to `toSequenceNr` (both inclusive). If `permanent` is set to
* deleted. * `false`, the persistent messages are marked as deleted, otherwise they are
* permanently deleted.
*
* @see [[AsyncReplay]]
*/ */
def delete(processorId: String, sequenceNr: Long, physical: Boolean): Unit def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Unit
/** /**
* Plugin API: synchronously writes a delivery confirmation to the journal. * Plugin API: synchronously writes a delivery confirmation to the journal.

View file

@ -33,8 +33,8 @@ private[persistence] class InmemJournal extends AsyncWriteJournal {
def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] = def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] =
(store ? WriteBatch(persistentBatch)).mapTo[Unit] (store ? WriteBatch(persistentBatch)).mapTo[Unit]
def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit] = def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] =
(store ? Delete(processorId, sequenceNr, physical)).mapTo[Unit] (store ? Delete(processorId, fromSequenceNr, toSequenceNr, permanent)).mapTo[Unit]
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] =
(store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit] (store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit]
@ -56,11 +56,11 @@ private[persistence] class InmemStore extends Actor {
case WriteBatch(pb) case WriteBatch(pb)
pb.foreach(add) pb.foreach(add)
success() success()
case Delete(pid, snr, false) case Delete(pid, fsnr, tsnr, false)
update(pid, snr)(_.update(deleted = true)) fsnr to tsnr foreach { snr update(pid, snr)(_.update(deleted = true)) }
success() success()
case Delete(pid, snr, true) case Delete(pid, fsnr, tsnr, true)
delete(pid, snr) fsnr to tsnr foreach { snr delete(pid, snr) }
success() success()
case Confirm(pid, snr, cid) case Confirm(pid, snr, cid)
update(pid, snr)(p p.update(confirms = cid +: p.confirms)) update(pid, snr)(p p.update(confirms = cid +: p.confirms))
@ -106,7 +106,7 @@ private[persistence] class InmemStore extends Actor {
private[persistence] object InmemStore { private[persistence] object InmemStore {
case class Write(p: PersistentRepr) case class Write(p: PersistentRepr)
case class WriteBatch(pb: Seq[PersistentRepr]) case class WriteBatch(pb: Seq[PersistentRepr])
case class Delete(processorId: String, sequenceNr: Long, physical: Boolean) case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean)
case class Confirm(processorId: String, sequenceNr: Long, channelId: String) case class Confirm(processorId: String, sequenceNr: Long, channelId: String)
case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentRepr) Unit) case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentRepr) Unit)
} }

View file

@ -22,8 +22,8 @@ abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal wit
final def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]) = final def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]) =
doWriteBatchAsync(persistentBatch.asJava).map(Unit.unbox) doWriteBatchAsync(persistentBatch.asJava).map(Unit.unbox)
final def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean) = final def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) =
doDeleteAsync(processorId, sequenceNr, physical).map(Unit.unbox) doDeleteAsync(processorId, fromSequenceNr, toSequenceNr, permanent).map(Unit.unbox)
final def confirmAsync(processorId: String, sequenceNr: Long, channelId: String) = final def confirmAsync(processorId: String, sequenceNr: Long, channelId: String) =
doConfirmAsync(processorId, sequenceNr, channelId).map(Unit.unbox) doConfirmAsync(processorId, sequenceNr, channelId).map(Unit.unbox)

View file

@ -20,8 +20,8 @@ abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal with
final def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) = final def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) =
doWriteBatch(persistentBatch.asJava) doWriteBatch(persistentBatch.asJava)
final def delete(processorId: String, sequenceNr: Long, physical: Boolean) = final def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) =
doDelete(processorId, sequenceNr, physical) doDelete(processorId, fromSequenceNr, toSequenceNr, permanent)
final def confirm(processorId: String, sequenceNr: Long, channelId: String) = final def confirm(processorId: String, sequenceNr: Long, channelId: String) =
doConfirm(processorId, sequenceNr, channelId) doConfirm(processorId, sequenceNr, channelId)

View file

@ -45,12 +45,14 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap
def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) = def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) =
withBatch(batch persistentBatch.foreach(persistent addToBatch(persistent, batch))) withBatch(batch persistentBatch.foreach(persistent addToBatch(persistent, batch)))
def delete(processorId: String, sequenceNr: Long, physical: Boolean) { def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = withBatch { batch
if (physical) val nid = numericId(processorId)
// TODO: delete confirmations and deletion markers, if any. if (permanent) fromSequenceNr to toSequenceNr foreach { sequenceNr
leveldb.delete(keyToBytes(Key(numericId(processorId), sequenceNr, 0))) batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) // TODO: delete confirmations and deletion markers, if any.
else }
leveldb.put(keyToBytes(deletionKey(numericId(processorId), sequenceNr)), Array.empty[Byte]) else fromSequenceNr to toSequenceNr foreach { sequenceNr
batch.put(keyToBytes(deletionKey(nid, sequenceNr)), Array.empty[Byte])
}
} }
def confirm(processorId: String, sequenceNr: Long, channelId: String) { def confirm(processorId: String, sequenceNr: Long, channelId: String) {

View file

@ -18,6 +18,8 @@ trait SnapshotStore extends Actor {
import SnapshotProtocol._ import SnapshotProtocol._
import context.dispatcher import context.dispatcher
private val extension = Persistence(context.system)
final def receive = { final def receive = {
case LoadSnapshot(processorId, criteria, toSequenceNr) { case LoadSnapshot(processorId, criteria, toSequenceNr) {
val p = sender val p = sender
@ -44,6 +46,14 @@ trait SnapshotStore extends Actor {
delete(metadata) delete(metadata)
sender ! evt // sender is processor sender ! evt // sender is processor
} }
case d @ DeleteSnapshot(metadata) {
delete(metadata)
if (extension.publishPluginCommands) context.system.eventStream.publish(d)
}
case d @ DeleteSnapshots(processorId, criteria) {
delete(processorId, criteria)
if (extension.publishPluginCommands) context.system.eventStream.publish(d)
}
} }
//#snapshot-store-plugin-api //#snapshot-store-plugin-api
@ -75,6 +85,15 @@ trait SnapshotStore extends Actor {
* *
* @param metadata snapshot metadata. * @param metadata snapshot metadata.
*/ */
def delete(metadata: SnapshotMetadata) def delete(metadata: SnapshotMetadata)
/**
* Plugin API: deletes all snapshots matching `criteria`.
*
* @param processorId processor id.
* @param criteria selection criteria for deleting.
*/
def delete(processorId: String, criteria: SnapshotSelectionCriteria)
//#snapshot-store-plugin-api //#snapshot-store-plugin-api
} }

View file

@ -27,4 +27,8 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin {
final def delete(metadata: SnapshotMetadata) = final def delete(metadata: SnapshotMetadata) =
doDelete(metadata) doDelete(metadata)
final def delete(processorId: String, criteria: SnapshotSelectionCriteria) =
doDelete(processorId: String, criteria: SnapshotSelectionCriteria)
} }

View file

@ -54,6 +54,13 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
snapshotFile(metadata).delete() snapshotFile(metadata).delete()
} }
def delete(processorId: String, criteria: SnapshotSelectionCriteria) = {
snapshotMetadata.get(processorId) match {
case Some(mds) mds.filter(criteria.matches).foreach(delete)
case None
}
}
private def load(processorId: String, criteria: SnapshotSelectionCriteria): Option[SelectedSnapshot] = { private def load(processorId: String, criteria: SnapshotSelectionCriteria): Option[SelectedSnapshot] = {
@scala.annotation.tailrec @scala.annotation.tailrec
def load(metadata: SortedSet[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match { def load(metadata: SortedSet[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match {
@ -78,11 +85,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
// //
// TODO: make number of loading attempts configurable // TODO: make number of loading attempts configurable
for { for (md load(metadata(processorId).filter(criteria.matches).takeRight(3))) yield md
md load(metadata(processorId).filter(md
md.sequenceNr <= criteria.maxSequenceNr &&
md.timestamp <= criteria.maxTimestamp).takeRight(3))
} yield md
} }
private def save(metadata: SnapshotMetadata, snapshot: Any): Unit = private def save(metadata: SnapshotMetadata, snapshot: Any): Unit =

View file

@ -56,6 +56,7 @@ object PersistenceSpec {
s""" s"""
serialize-creators = on serialize-creators = on
serialize-messages = on serialize-messages = on
akka.persistence.publish-plugin-commands = on
akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}" akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}"
akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec" akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec"
akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}-spec/" akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}-spec/"

View file

@ -23,7 +23,7 @@ object ProcessorSpec {
override def preRestart(reason: Throwable, message: Option[Any]) = { override def preRestart(reason: Throwable, message: Option[Any]) = {
message match { message match {
case Some(m: Persistent) deleteMessage(m) // delete message from journal case Some(m: Persistent) deleteMessage(m.sequenceNr) // delete message from journal
case _ // ignore case _ // ignore
} }
super.preRestart(reason, message) super.preRestart(reason, message)
@ -112,7 +112,7 @@ object ProcessorSpec {
class LastReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) { class LastReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) {
override def preRestart(reason: Throwable, message: Option[Any]) = { override def preRestart(reason: Throwable, message: Option[Any]) = {
message match { message match {
case Some(m: Persistent) if (recoveryRunning) deleteMessage(m) case Some(m: Persistent) if (recoveryRunning) deleteMessage(m.sequenceNr)
case _ case _
} }
super.preRestart(reason, message) super.preRestart(reason, message)
@ -126,10 +126,22 @@ object ProcessorSpec {
override def receive = failOnReplayedA orElse super.receive override def receive = failOnReplayedA orElse super.receive
} }
case class Delete1(snr: Long)
case class DeleteN(toSnr: Long)
class DeleteMessageTestProcessor(name: String) extends RecoverTestProcessor(name) {
override def receive = {
case Delete1(snr) deleteMessage(snr)
case DeleteN(toSnr) deleteMessages(toSnr)
case m super.receive(m)
}
}
} }
abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ProcessorSpec._ import ProcessorSpec._
import JournalProtocol._
override protected def beforeEach() { override protected def beforeEach() {
super.beforeEach() super.beforeEach()
@ -292,6 +304,40 @@ abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with Persi
processor ! GetState processor ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5", "f-6")) expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5", "f-6"))
} }
"support single message deletions" in {
val deleteProbe = TestProbe()
system.eventStream.subscribe(deleteProbe.ref, classOf[Delete])
val processor1 = namedProcessor[DeleteMessageTestProcessor]
processor1 ! Persistent("c")
processor1 ! Persistent("d")
processor1 ! Persistent("e")
processor1 ! Delete1(4)
deleteProbe.expectMsgType[Delete]
val processor2 = namedProcessor[DeleteMessageTestProcessor]
processor2 ! GetState
expectMsg(List("a-1", "b-2", "c-3", "e-5"))
}
"support bulk message deletions" in {
val deleteProbe = TestProbe()
system.eventStream.subscribe(deleteProbe.ref, classOf[Delete])
val processor1 = namedProcessor[DeleteMessageTestProcessor]
processor1 ! Persistent("c")
processor1 ! Persistent("d")
processor1 ! Persistent("e")
processor1 ! DeleteN(4)
deleteProbe.expectMsgType[Delete]
val processor2 = namedProcessor[DeleteMessageTestProcessor]
processor2 ! GetState
expectMsg(List("e-5"))
}
} }
"A processor" can { "A processor" can {

View file

@ -38,7 +38,7 @@ object ProcessorStashSpec {
class RecoveryFailureStashingProcessor(name: String) extends StashingProcessor(name) { class RecoveryFailureStashingProcessor(name: String) extends StashingProcessor(name) {
override def preRestart(reason: Throwable, message: Option[Any]) = { override def preRestart(reason: Throwable, message: Option[Any]) = {
message match { message match {
case Some(m: Persistent) if (recoveryRunning) deleteMessage(m) case Some(m: Persistent) if (recoveryRunning) deleteMessage(m.sequenceNr)
case _ case _
} }
super.preRestart(reason, message) super.preRestart(reason, message)

View file

@ -28,10 +28,22 @@ object SnapshotSpec {
} }
override def preStart() = () override def preStart() = ()
} }
case class Delete1(metadata: SnapshotMetadata)
case class DeleteN(criteria: SnapshotSelectionCriteria)
class DeleteSnapshotTestProcessor(name: String, probe: ActorRef) extends LoadSnapshotTestProcessor(name, probe) {
override def receive = {
case Delete1(metadata) deleteSnapshot(metadata.sequenceNr, metadata.timestamp)
case DeleteN(criteria) deleteSnapshots(criteria)
case other super.receive(other)
}
}
} }
class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot")) with PersistenceSpec with ImplicitSender { class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot")) with PersistenceSpec with ImplicitSender {
import SnapshotSpec._ import SnapshotSpec._
import SnapshotProtocol._
override protected def beforeEach() { override protected def beforeEach() {
super.beforeEach() super.beforeEach()
@ -134,5 +146,68 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot"
expectMsg("b-2") expectMsg("b-2")
expectMsg("c-3") expectMsg("c-3")
} }
"support single message deletions" in {
val deleteProbe = TestProbe()
val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor))
val processorId = name
system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshot])
// recover processor from 3rd snapshot and then delete snapshot
processor1 ! Recover(toSequenceNr = 4)
processor1 ! "done"
val metadata = expectMsgPF() {
case (md @ SnapshotMetadata(`processorId`, 4, _), state) {
state must be(List("a-1", "b-2", "c-3", "d-4").reverse)
md
}
}
expectMsg("done")
processor1 ! Delete1(metadata)
deleteProbe.expectMsgType[DeleteSnapshot]
// recover processor from 2nd snapshot (3rd was deleted) plus replayed messages
val processor2 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor))
processor2 ! Recover(toSequenceNr = 4)
expectMsgPF() {
case (md @ SnapshotMetadata(`processorId`, 2, _), state) {
state must be(List("a-1", "b-2").reverse)
md
}
}
expectMsg("c-3")
expectMsg("d-4")
}
"support bulk message deletions" in {
val deleteProbe = TestProbe()
val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor))
val processorId = name
system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshots])
// recover processor and the delete first three (= all) snapshots
processor1 ! Recover(toSequenceNr = 4)
processor1 ! DeleteN(SnapshotSelectionCriteria(maxSequenceNr = 4))
expectMsgPF() {
case (md @ SnapshotMetadata(`processorId`, 4, _), state) {
state must be(List("a-1", "b-2", "c-3", "d-4").reverse)
}
}
deleteProbe.expectMsgType[DeleteSnapshots]
// recover processor from replayed messages (all snapshots deleted)
val processor2 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor))
processor2 ! Recover(toSequenceNr = 4)
expectMsg("a-1")
expectMsg("b-2")
expectMsg("c-3")
expectMsg("d-4")
}
} }
} }

View file

@ -34,7 +34,7 @@ public class ProcessorFailureExample {
@Override @Override
public void preRestart(Throwable reason, Option<Object> message) { public void preRestart(Throwable reason, Option<Object> message) {
if (message.isDefined() && message.get() instanceof Persistent) { if (message.isDefined() && message.get() instanceof Persistent) {
deleteMessage((Persistent) message.get()); deleteMessage(((Persistent) message.get()).sequenceNr(), false);
} }
super.preRestart(reason, message); super.preRestart(reason, message);
} }

View file

@ -20,7 +20,7 @@ object ProcessorFailureExample extends App {
override def preRestart(reason: Throwable, message: Option[Any]) { override def preRestart(reason: Throwable, message: Option[Any]) {
message match { message match {
case Some(p: Persistent) if !recoveryRunning deleteMessage(p) // mark failing message as deleted case Some(p: Persistent) if !recoveryRunning deleteMessage(p.sequenceNr) // mark failing message as deleted
case _ // ignore case _ // ignore
} }
super.preRestart(reason, message) super.preRestart(reason, message)