Merge pull request #1838 from krasserm/wip-3717-deletion-api-krasserm

!per #3717 Deletion API for snapshots and persistent messages
This commit is contained in:
Patrik Nordwall 2013-11-21 00:54:44 -08:00
commit 524addd404
32 changed files with 352 additions and 118 deletions

View file

@ -164,10 +164,11 @@ private[akka] trait StashSupport {
}
/**
* Prepends `others` to this stash.
* Prepends `others` to this stash. This method is optimized for a large stash and
* small `others`.
*/
private[akka] def prepend(others: immutable.Seq[Envelope]): Unit =
others.reverseIterator.foreach(env theStash = env +: theStash)
theStash = others.foldRight(theStash)((e, s) e +: s)
/**
* Prepends the oldest message in the stash to the mailbox, and then removes that

View file

@ -101,7 +101,7 @@ public class PersistenceDocTest {
@Override
public void preRestart(Throwable reason, Option<Object> message) {
if (message.isDefined() && message.get() instanceof Persistent) {
deleteMessage((Persistent) message.get());
deleteMessage(((Persistent) message.get()).sequenceNr());
}
super.preRestart(reason, message);
}

View file

@ -32,6 +32,10 @@ public class PersistencePluginDocTest {
@Override
public void doDelete(SnapshotMetadata metadata) throws Exception {
}
@Override
public void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception {
}
}
class MyAsyncJournal extends AsyncWriteJournal {
@ -51,7 +55,7 @@ public class PersistencePluginDocTest {
}
@Override
public Future<Void> doDeleteAsync(String processorId, long sequenceNr, boolean physical) {
public Future<Void> doDeleteAsync(String processorId, long fromSequenceNr, long toSequenceNr, boolean permanent) {
return null;
}

View file

@ -143,11 +143,12 @@ a replay of that message during recovery it can be deleted.
Message deletion
----------------
A processor can delete messages by calling the ``delete`` method with a ``Persistent`` message object or a
sequence number as argument. An optional ``physical`` parameter specifies whether the message shall be
physically deleted from the journal or only marked as deleted. In both cases, the message won't be replayed.
Later extensions to Akka persistence will allow to replay messages that have been marked as deleted which can
be useful for debugging purposes, for example.
A processor can delete a single message by calling the ``deleteMessage`` method with the sequence number of
that message as argument. An optional ``permanent`` parameter specifies whether the message shall be permanently
deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions
to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging
purposes, for example. To delete all messages (journaled by a single processor) up to a specified sequence number,
processors can call the ``deleteMessages`` method.
Identifiers
-----------
@ -315,6 +316,13 @@ If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which s
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
Snapshot deletion
-----------------
A processor can delete a single snapshot by calling the ``deleteSnapshot`` method with the sequence number and the
timestamp of the snapshot as argument. To bulk-delete snapshots that match a specified ``SnapshotSelectionCriteria``
argument, processors can call the ``deleteSnapshots`` method.
.. _event-sourcing-java:
Event sourcing

View file

@ -67,7 +67,7 @@ trait PersistenceDocSpec {
//#deletion
override def preRestart(reason: Throwable, message: Option[Any]) {
message match {
case Some(p: Persistent) deleteMessage(p)
case Some(p: Persistent) deleteMessage(p.sequenceNr)
case _
}
super.preRestart(reason, message)

View file

@ -71,7 +71,7 @@ class PersistencePluginDocSpec extends WordSpec {
class MyJournal extends AsyncWriteJournal {
def writeAsync(persistent: PersistentRepr): Future[Unit] = ???
def writeBatchAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ???
def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit] = ???
def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ???
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ???
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) Unit): Future[Long] = ???
}
@ -79,6 +79,7 @@ class MyJournal extends AsyncWriteJournal {
class MySnapshotStore extends SnapshotStore {
def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ???
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ???
def saved(metadata: SnapshotMetadata) {}
def delete(metadata: SnapshotMetadata) {}
def saved(metadata: SnapshotMetadata): Unit = ???
def delete(metadata: SnapshotMetadata): Unit = ???
def delete(processorId: String, criteria: SnapshotSelectionCriteria): Unit = ???
}

View file

@ -138,11 +138,12 @@ a replay of that message during recovery it can be deleted.
Message deletion
----------------
A processor can delete messages by calling the ``delete`` method with a ``Persistent`` message object or a
sequence number as argument. An optional ``physical`` parameter specifies whether the message shall be
physically deleted from the journal or only marked as deleted. In both cases, the message won't be replayed.
Later extensions to Akka persistence will allow to replay messages that have been marked as deleted which can
be useful for debugging purposes, for example.
A processor can delete a single message by calling the ``deleteMessage`` method with the sequence number of
that message as argument. An optional ``permanent`` parameter specifies whether the message shall be permanently
deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions
to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging
purposes, for example. To delete all messages (journaled by a single processor) up to a specified sequence number,
processors can call the ``deleteMessages`` method.
Identifiers
-----------
@ -326,6 +327,13 @@ If not specified, they default to ``SnapshotSelectionCriteria.Latest`` which sel
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.None``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
Snapshot deletion
-----------------
A processor can delete a single snapshot by calling the ``deleteSnapshot`` method with the sequence number and the
timestamp of the snapshot as argument. To bulk-delete snapshots that match a specified ``SnapshotSelectionCriteria``
argument, processors can call the ``deleteSnapshots`` method.
.. _event-sourcing:
Event sourcing

View file

@ -28,7 +28,7 @@ interface AsyncReplayPlugin {
* message must be contained in that message's `confirms` sequence.
*
* @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start.
* @param fromSequenceNr sequence number where replay should start (inclusive).
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message. Can be called from any
* thread.

View file

@ -23,11 +23,14 @@ interface AsyncWritePlugin {
Future<Void> doWriteBatchAsync(Iterable<PersistentRepr> persistentBatch);
/**
* Java API, Plugin API: asynchronously deletes a persistent message. If `physical`
* is set to `false`, the persistent message is marked as deleted, otherwise it is
* physically deleted.
* Java API, Plugin API: asynchronously deletes all persistent messages within the
* range from `fromSequenceNr` to `toSequenceNr`. If `permanent` is set to `false`,
* the persistent messages are marked as deleted, otherwise they are permanently
* deleted.
*
* @see AsyncReplayPlugin
*/
Future<Void> doDeleteAsync(String processorId, long sequenceNr, boolean physical);
Future<Void> doDeleteAsync(String processorId, long fromSequenceNr, long toSequenceNr, boolean permanent);
/**
* Java API, Plugin API: asynchronously writes a delivery confirmation to the

View file

@ -21,11 +21,14 @@ interface SyncWritePlugin {
void doWriteBatch(Iterable<PersistentRepr> persistentBatch);
/**
* Java API, Plugin API: synchronously deletes a persistent message. If `physical`
* is set to `false`, the persistent message is marked as deleted, otherwise it is
* physically deleted.
* Java API, Plugin API: synchronously deletes all persistent messages within the
* range from `fromSequenceNr` to `toSequenceNr`. If `permanent` is set to `false`,
* the persistent messages are marked as deleted, otherwise they are permanently
* deleted.
*
* @see AsyncReplayPlugin
*/
void doDelete(String processorId, long sequenceNr, boolean physical);
void doDelete(String processorId, long fromSequenceNr, long toSequenceNr, boolean permanent);
/**
* Java API, Plugin API: synchronously writes a delivery confirmation to the journal.

View file

@ -40,5 +40,13 @@ interface SnapshotStorePlugin {
* @param metadata snapshot metadata.
*/
void doDelete(SnapshotMetadata metadata) throws Exception;
/**
* Java API, Plugin API: deletes all snapshots matching `criteria`.
*
* @param processorId processor id.
* @param criteria selection criteria for deleting.
*/
void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception;
//#snapshot-store-plugin-api
}

View file

@ -64,11 +64,11 @@ private[persistence] trait Eventsourced extends Processor {
private val persistingEvents: State = new State {
def aroundReceive(receive: Receive, message: Any) = message match {
case PersistentBatch(b) {
b.foreach(p deleteMessage(p, true))
b.foreach(p deleteMessage(p.sequenceNr, true))
throw new UnsupportedOperationException("Persistent command batches not supported")
}
case p: PersistentRepr {
deleteMessage(p, true)
deleteMessage(p.sequenceNr, true)
throw new UnsupportedOperationException("Persistent commands not supported")
}
case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) {

View file

@ -13,12 +13,12 @@ import akka.actor._
*/
private[persistence] object JournalProtocol {
/**
* Instructs a journal to delete a persistent message identified by `processorId`
* and `sequenceNr`. If `physical` is set to `false`, the persistent message is
* marked as deleted in the journal, otherwise it is physically deleted from the
* journal.
* Instructs a journal to delete all persistent messages with sequence numbers in
* the range from `fromSequenceNr` to `toSequenceNr` (both inclusive). If `permanent`
* is set to `false`, the persistent messages are marked as deleted in the journal,
* otherwise they are permanently deleted from the journal.
*/
case class Delete(processorId: String, sequenceNr: Long, physical: Boolean)
case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean)
/**
* Instructs a journal to persist a sequence of messages.

View file

@ -4,8 +4,6 @@
package akka.persistence
import com.typesafe.config.Config
import akka.actor._
import akka.dispatch.Dispatchers
import akka.persistence.journal.AsyncWriteJournal
@ -35,6 +33,13 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
private val journal = createPlugin("journal", clazz
if (classOf[AsyncWriteJournal].isAssignableFrom(clazz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId)
private[persistence] val publishPluginCommands: Boolean = {
val path = "publish-plugin-commands"
// this config option is only used internally (for testing
// purposes) and is therefore not defined in reference.conf
config.hasPath(path) && config.getBoolean(path)
}
/**
* Returns a snapshot store for a processor identified by `processorId`.
*/
@ -69,7 +74,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
val pluginClassName = pluginConfig.getString("class")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass)
system.actorOf(Props(pluginClass).withDispatcher(pluginDispatcherId))
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(pluginClass).withDispatcher(pluginDispatcherId), pluginType)
}
private def id(ref: ActorRef) = ref.path.toStringWithAddress(system.provider.getDefaultAddress)

View file

@ -235,54 +235,57 @@ trait Processor extends Actor with Stash with StashFactory {
implicit def currentPersistentMessage: Option[Persistent] = Option(_currentPersistent)
/**
* Marks the `persistent` message as deleted. A message marked as deleted is not replayed during
* recovery. This method is usually called inside `preRestartProcessor` when a persistent message
* caused an exception. Processors that want to re-receive that persistent message during recovery
* should not call this method.
* Marks a persistent message, identified by `sequenceNr`, as deleted. A message marked as deleted is
* not replayed during recovery. This method is usually called inside `preRestartProcessor` when a
* persistent message caused an exception. Processors that want to re-receive that persistent message
* during recovery should not call this method.
*
* @param persistent persistent message to be marked as deleted.
* @throws IllegalArgumentException if `persistent` message has not been persisted by this
* processor.
* @param sequenceNr sequence number of the persistent message to be deleted.
*/
def deleteMessage(persistent: Persistent): Unit = {
deleteMessage(persistent, false)
def deleteMessage(sequenceNr: Long): Unit = {
deleteMessage(sequenceNr, false)
}
/**
* Deletes a `persistent` message. If `physical` is set to `false` (default), the persistent
* message is marked as deleted in the journal, otherwise it is physically deleted from the
* journal. A deleted message is not replayed during recovery. This method is usually called
* inside `preRestartProcessor` when a persistent message caused an exception. Processors that
* want to re-receive that persistent message during recovery should not call this method.
*
* @param persistent persistent message to be deleted.
* @param physical if `false` (default), the message is marked as deleted, otherwise it is
* physically deleted.
* @throws IllegalArgumentException if `persistent` message has not been persisted by this
* processor.
*/
def deleteMessage(persistent: Persistent, physical: Boolean): Unit = {
val impl = persistent.asInstanceOf[PersistentRepr]
if (impl.processorId != processorId)
throw new IllegalArgumentException(
s"persistent message to be deleted (processor id = [${impl.processorId}], sequence number = [${impl.sequenceNr}]) " +
s"has not been persisted by this processor (processor id = [${processorId}])")
else deleteMessage(impl.sequenceNr, physical)
}
/**
* Deletes a persistent message identified by `sequenceNr`. If `physical` is set to `false`,
* the persistent message is marked as deleted in the journal, otherwise it is physically
* Deletes a persistent message identified by `sequenceNr`. If `permanent` is set to `false`,
* the persistent message is marked as deleted in the journal, otherwise it is permanently
* deleted from the journal. A deleted message is not replayed during recovery. This method
* is usually called inside `preRestartProcessor` when a persistent message caused an exception.
* Processors that want to re-receive that persistent message during recovery should not call
* this method.
*
* Later extensions may also allow a replay of messages that have been marked as deleted which can
* be useful in debugging environments.
*
* @param sequenceNr sequence number of the persistent message to be deleted.
* @param physical if `false`, the message is marked as deleted, otherwise it is physically deleted.
* @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted.
*/
def deleteMessage(sequenceNr: Long, physical: Boolean): Unit = {
journal ! Delete(processorId, sequenceNr, physical)
def deleteMessage(sequenceNr: Long, permanent: Boolean): Unit = {
journal ! Delete(processorId, sequenceNr, sequenceNr, permanent)
}
/**
* Marks all persistent messages with sequence numbers less than or equal `toSequenceNr` as deleted.
*
* @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
*/
def deleteMessages(toSequenceNr: Long): Unit = {
deleteMessages(toSequenceNr, false)
}
/**
* Deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. If `permanent`
* is set to `false`, the persistent messages are marked as deleted in the journal, otherwise
* they permanently deleted from the journal.
*
* Later extensions may also allow a replay of messages that have been marked as deleted which can
* be useful in debugging environments.
*
* @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
* @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted.
*/
def deleteMessages(toSequenceNr: Long, permanent: Boolean): Unit = {
journal ! Delete(processorId, 1L, toSequenceNr, permanent)
}
/**
@ -293,6 +296,20 @@ trait Processor extends Actor with Stash with StashFactory {
snapshotStore ! SaveSnapshot(SnapshotMetadata(processorId, lastSequenceNr), snapshot)
}
/**
* Deletes a snapshot identified by `sequenceNr` and `timestamp`.
*/
def deleteSnapshot(sequenceNr: Long, timestamp: Long): Unit = {
snapshotStore ! DeleteSnapshot(SnapshotMetadata(processorId, sequenceNr, timestamp))
}
/**
* Deletes all snapshots matching `criteria`.
*/
def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = {
snapshotStore ! DeleteSnapshots(processorId, criteria)
}
/**
* INTERNAL API.
*/

View file

@ -41,7 +41,7 @@ case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
/**
* Selection criteria for loading snapshots.
* Selection criteria for loading and deleting snapshots.
*
* @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound.
* @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound.
@ -52,6 +52,9 @@ case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any)
case class SnapshotSelectionCriteria(maxSequenceNr: Long = Long.MaxValue, maxTimestamp: Long = Long.MaxValue) {
private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria =
if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this
private[persistence] def matches(metadata: SnapshotMetadata): Boolean =
metadata.sequenceNr <= maxSequenceNr && metadata.timestamp <= maxTimestamp
}
object SnapshotSelectionCriteria {
@ -125,4 +128,19 @@ private[persistence] object SnapshotProtocol {
* @param snapshot snapshot.
*/
case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any)
/**
* Instructs snapshot store to delete a snapshot.
*
* @param metadata snapshot metadata.
*/
case class DeleteSnapshot(metadata: SnapshotMetadata)
/**
* Instructs snapshot store to delete all snapshots that match `criteria`.
*
* @param processorId processor id.
* @param criteria criteria for selecting snapshots to be deleted.
*/
case class DeleteSnapshots(processorId: String, criteria: SnapshotSelectionCriteria)
}

View file

@ -29,7 +29,7 @@ trait AsyncReplay {
* message must be contained in that message's `confirms` sequence.
*
* @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start.
* @param fromSequenceNr sequence number where replay should start (inclusive).
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message. Can be called from any
* thread.

View file

@ -21,6 +21,8 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
import AsyncWriteJournal._
import context.dispatcher
private val extension = Persistence(context.system)
private val resequencer = context.actorOf(Props[Resequencer])
private var resequencerCounter = 1L
@ -62,14 +64,14 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
}
case c @ Confirm(processorId, sequenceNr, channelId) {
confirmAsync(processorId, sequenceNr, channelId) onComplete {
case Success(_) context.system.eventStream.publish(c)
case Success(_) if (extension.publishPluginCommands) context.system.eventStream.publish(c)
case Failure(e) // TODO: publish failure to event stream
}
context.system.eventStream.publish(c)
}
case d @ Delete(processorId, sequenceNr, physical) {
deleteAsync(processorId, sequenceNr, physical) onComplete {
case Success(_) context.system.eventStream.publish(d)
case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent) {
deleteAsync(processorId, fromSequenceNr, toSequenceNr, permanent) onComplete {
case Success(_) if (extension.publishPluginCommands) context.system.eventStream.publish(d)
case Failure(e) // TODO: publish failure to event stream
}
}
@ -93,11 +95,14 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit]
/**
* Plugin API: asynchronously deletes a persistent message. If `physical` is set to
* `false`, the persistent message is marked as deleted, otherwise it is physically
* deleted.
* Plugin API: asynchronously deletes all persistent messages within the range from
* `fromSequenceNr` to `toSequenceNr` (both inclusive). If `permanent` is set to
* `false`, the persistent messages are marked as deleted, otherwise they are
* permanently deleted.
*
* @see [[AsyncReplay]]
*/
def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit]
def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit]
/**
* Plugin API: asynchronously writes a delivery confirmation to the journal.

View file

@ -47,11 +47,11 @@ trait SyncWriteJournal extends Actor with AsyncReplay {
}
case c @ Confirm(processorId, sequenceNr, channelId) {
confirm(processorId, sequenceNr, channelId)
context.system.eventStream.publish(c) // TODO: turn off by default and allow to turn on by configuration
if (extension.publishPluginCommands) context.system.eventStream.publish(c)
}
case d @ Delete(processorId, sequenceNr, physical) {
delete(processorId, sequenceNr, physical)
context.system.eventStream.publish(d) // TODO: turn off by default and allow to turn on by configuration
case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent) {
delete(processorId, fromSequenceNr, toSequenceNr, permanent)
if (extension.publishPluginCommands) context.system.eventStream.publish(d)
}
case Loop(message, processor) {
processor forward LoopSuccess(message)
@ -72,11 +72,14 @@ trait SyncWriteJournal extends Actor with AsyncReplay {
def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]): Unit
/**
* Plugin API: synchronously deletes a persistent message. If `physical` is set to
* `false`, the persistent message is marked as deleted, otherwise it is physically
* deleted.
* Plugin API: synchronously deletes all persistent messages within the range from
* `fromSequenceNr` to `toSequenceNr` (both inclusive). If `permanent` is set to
* `false`, the persistent messages are marked as deleted, otherwise they are
* permanently deleted.
*
* @see [[AsyncReplay]]
*/
def delete(processorId: String, sequenceNr: Long, physical: Boolean): Unit
def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Unit
/**
* Plugin API: synchronously writes a delivery confirmation to the journal.

View file

@ -33,8 +33,8 @@ private[persistence] class InmemJournal extends AsyncWriteJournal {
def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] =
(store ? WriteBatch(persistentBatch)).mapTo[Unit]
def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit] =
(store ? Delete(processorId, sequenceNr, physical)).mapTo[Unit]
def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] =
(store ? Delete(processorId, fromSequenceNr, toSequenceNr, permanent)).mapTo[Unit]
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] =
(store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit]
@ -56,11 +56,11 @@ private[persistence] class InmemStore extends Actor {
case WriteBatch(pb)
pb.foreach(add)
success()
case Delete(pid, snr, false)
update(pid, snr)(_.update(deleted = true))
case Delete(pid, fsnr, tsnr, false)
fsnr to tsnr foreach { snr update(pid, snr)(_.update(deleted = true)) }
success()
case Delete(pid, snr, true)
delete(pid, snr)
case Delete(pid, fsnr, tsnr, true)
fsnr to tsnr foreach { snr delete(pid, snr) }
success()
case Confirm(pid, snr, cid)
update(pid, snr)(p p.update(confirms = cid +: p.confirms))
@ -106,7 +106,7 @@ private[persistence] class InmemStore extends Actor {
private[persistence] object InmemStore {
case class Write(p: PersistentRepr)
case class WriteBatch(pb: Seq[PersistentRepr])
case class Delete(processorId: String, sequenceNr: Long, physical: Boolean)
case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean)
case class Confirm(processorId: String, sequenceNr: Long, channelId: String)
case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentRepr) Unit)
}

View file

@ -22,8 +22,8 @@ abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal wit
final def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]) =
doWriteBatchAsync(persistentBatch.asJava).map(Unit.unbox)
final def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean) =
doDeleteAsync(processorId, sequenceNr, physical).map(Unit.unbox)
final def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) =
doDeleteAsync(processorId, fromSequenceNr, toSequenceNr, permanent).map(Unit.unbox)
final def confirmAsync(processorId: String, sequenceNr: Long, channelId: String) =
doConfirmAsync(processorId, sequenceNr, channelId).map(Unit.unbox)

View file

@ -20,8 +20,8 @@ abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal with
final def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) =
doWriteBatch(persistentBatch.asJava)
final def delete(processorId: String, sequenceNr: Long, physical: Boolean) =
doDelete(processorId, sequenceNr, physical)
final def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) =
doDelete(processorId, fromSequenceNr, toSequenceNr, permanent)
final def confirm(processorId: String, sequenceNr: Long, channelId: String) =
doConfirm(processorId, sequenceNr, channelId)

View file

@ -48,12 +48,14 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap
def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) =
withBatch(batch persistentBatch.foreach(persistent addToBatch(persistent, batch)))
def delete(processorId: String, sequenceNr: Long, physical: Boolean) {
if (physical)
// TODO: delete confirmations and deletion markers, if any.
leveldb.delete(keyToBytes(Key(numericId(processorId), sequenceNr, 0)))
else
leveldb.put(keyToBytes(deletionKey(numericId(processorId), sequenceNr)), Array.empty[Byte])
def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = withBatch { batch
val nid = numericId(processorId)
if (permanent) fromSequenceNr to toSequenceNr foreach { sequenceNr
batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) // TODO: delete confirmations and deletion markers, if any.
}
else fromSequenceNr to toSequenceNr foreach { sequenceNr
batch.put(keyToBytes(deletionKey(nid, sequenceNr)), Array.empty[Byte])
}
}
def confirm(processorId: String, sequenceNr: Long, channelId: String) {

View file

@ -18,6 +18,8 @@ trait SnapshotStore extends Actor {
import SnapshotProtocol._
import context.dispatcher
private val extension = Persistence(context.system)
final def receive = {
case LoadSnapshot(processorId, criteria, toSequenceNr) {
val p = sender
@ -44,6 +46,14 @@ trait SnapshotStore extends Actor {
delete(metadata)
sender ! evt // sender is processor
}
case d @ DeleteSnapshot(metadata) {
delete(metadata)
if (extension.publishPluginCommands) context.system.eventStream.publish(d)
}
case d @ DeleteSnapshots(processorId, criteria) {
delete(processorId, criteria)
if (extension.publishPluginCommands) context.system.eventStream.publish(d)
}
}
//#snapshot-store-plugin-api
@ -75,6 +85,15 @@ trait SnapshotStore extends Actor {
*
* @param metadata snapshot metadata.
*/
def delete(metadata: SnapshotMetadata)
/**
* Plugin API: deletes all snapshots matching `criteria`.
*
* @param processorId processor id.
* @param criteria selection criteria for deleting.
*/
def delete(processorId: String, criteria: SnapshotSelectionCriteria)
//#snapshot-store-plugin-api
}

View file

@ -27,4 +27,8 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin {
final def delete(metadata: SnapshotMetadata) =
doDelete(metadata)
final def delete(processorId: String, criteria: SnapshotSelectionCriteria) =
doDelete(processorId: String, criteria: SnapshotSelectionCriteria)
}

View file

@ -54,6 +54,13 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
snapshotFile(metadata).delete()
}
def delete(processorId: String, criteria: SnapshotSelectionCriteria) = {
snapshotMetadata.get(processorId) match {
case Some(mds) mds.filter(criteria.matches).foreach(delete)
case None
}
}
private def load(processorId: String, criteria: SnapshotSelectionCriteria): Option[SelectedSnapshot] = {
@scala.annotation.tailrec
def load(metadata: SortedSet[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match {
@ -78,11 +85,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
//
// TODO: make number of loading attempts configurable
for {
md load(metadata(processorId).filter(md
md.sequenceNr <= criteria.maxSequenceNr &&
md.timestamp <= criteria.maxTimestamp).takeRight(3))
} yield md
for (md load(metadata(processorId).filter(criteria.matches).takeRight(3))) yield md
}
private def save(metadata: SnapshotMetadata, snapshot: Any): Unit =

View file

@ -56,6 +56,7 @@ object PersistenceSpec {
s"""
serialize-creators = on
serialize-messages = on
akka.persistence.publish-plugin-commands = on
akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}"
akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec"
akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}-spec/"

View file

@ -23,7 +23,7 @@ object ProcessorSpec {
override def preRestart(reason: Throwable, message: Option[Any]) = {
message match {
case Some(m: Persistent) deleteMessage(m) // delete message from journal
case Some(m: Persistent) deleteMessage(m.sequenceNr) // delete message from journal
case _ // ignore
}
super.preRestart(reason, message)
@ -112,7 +112,7 @@ object ProcessorSpec {
class LastReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) {
override def preRestart(reason: Throwable, message: Option[Any]) = {
message match {
case Some(m: Persistent) if (recoveryRunning) deleteMessage(m)
case Some(m: Persistent) if (recoveryRunning) deleteMessage(m.sequenceNr)
case _
}
super.preRestart(reason, message)
@ -126,10 +126,22 @@ object ProcessorSpec {
override def receive = failOnReplayedA orElse super.receive
}
case class Delete1(snr: Long)
case class DeleteN(toSnr: Long)
class DeleteMessageTestProcessor(name: String) extends RecoverTestProcessor(name) {
override def receive = {
case Delete1(snr) deleteMessage(snr)
case DeleteN(toSnr) deleteMessages(toSnr)
case m super.receive(m)
}
}
}
abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ProcessorSpec._
import JournalProtocol._
override protected def beforeEach() {
super.beforeEach()
@ -292,6 +304,40 @@ abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with Persi
processor ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5", "f-6"))
}
"support single message deletions" in {
val deleteProbe = TestProbe()
system.eventStream.subscribe(deleteProbe.ref, classOf[Delete])
val processor1 = namedProcessor[DeleteMessageTestProcessor]
processor1 ! Persistent("c")
processor1 ! Persistent("d")
processor1 ! Persistent("e")
processor1 ! Delete1(4)
deleteProbe.expectMsgType[Delete]
val processor2 = namedProcessor[DeleteMessageTestProcessor]
processor2 ! GetState
expectMsg(List("a-1", "b-2", "c-3", "e-5"))
}
"support bulk message deletions" in {
val deleteProbe = TestProbe()
system.eventStream.subscribe(deleteProbe.ref, classOf[Delete])
val processor1 = namedProcessor[DeleteMessageTestProcessor]
processor1 ! Persistent("c")
processor1 ! Persistent("d")
processor1 ! Persistent("e")
processor1 ! DeleteN(4)
deleteProbe.expectMsgType[Delete]
val processor2 = namedProcessor[DeleteMessageTestProcessor]
processor2 ! GetState
expectMsg(List("e-5"))
}
}
"A processor" can {

View file

@ -38,7 +38,7 @@ object ProcessorStashSpec {
class RecoveryFailureStashingProcessor(name: String) extends StashingProcessor(name) {
override def preRestart(reason: Throwable, message: Option[Any]) = {
message match {
case Some(m: Persistent) if (recoveryRunning) deleteMessage(m)
case Some(m: Persistent) if (recoveryRunning) deleteMessage(m.sequenceNr)
case _
}
super.preRestart(reason, message)

View file

@ -28,10 +28,22 @@ object SnapshotSpec {
}
override def preStart() = ()
}
case class Delete1(metadata: SnapshotMetadata)
case class DeleteN(criteria: SnapshotSelectionCriteria)
class DeleteSnapshotTestProcessor(name: String, probe: ActorRef) extends LoadSnapshotTestProcessor(name, probe) {
override def receive = {
case Delete1(metadata) deleteSnapshot(metadata.sequenceNr, metadata.timestamp)
case DeleteN(criteria) deleteSnapshots(criteria)
case other super.receive(other)
}
}
}
class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot")) with PersistenceSpec with ImplicitSender {
import SnapshotSpec._
import SnapshotProtocol._
override protected def beforeEach() {
super.beforeEach()
@ -134,5 +146,68 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot"
expectMsg("b-2")
expectMsg("c-3")
}
"support single message deletions" in {
val deleteProbe = TestProbe()
val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor))
val processorId = name
system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshot])
// recover processor from 3rd snapshot and then delete snapshot
processor1 ! Recover(toSequenceNr = 4)
processor1 ! "done"
val metadata = expectMsgPF() {
case (md @ SnapshotMetadata(`processorId`, 4, _), state) {
state must be(List("a-1", "b-2", "c-3", "d-4").reverse)
md
}
}
expectMsg("done")
processor1 ! Delete1(metadata)
deleteProbe.expectMsgType[DeleteSnapshot]
// recover processor from 2nd snapshot (3rd was deleted) plus replayed messages
val processor2 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor))
processor2 ! Recover(toSequenceNr = 4)
expectMsgPF() {
case (md @ SnapshotMetadata(`processorId`, 2, _), state) {
state must be(List("a-1", "b-2").reverse)
md
}
}
expectMsg("c-3")
expectMsg("d-4")
}
"support bulk message deletions" in {
val deleteProbe = TestProbe()
val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor))
val processorId = name
system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshots])
// recover processor and the delete first three (= all) snapshots
processor1 ! Recover(toSequenceNr = 4)
processor1 ! DeleteN(SnapshotSelectionCriteria(maxSequenceNr = 4))
expectMsgPF() {
case (md @ SnapshotMetadata(`processorId`, 4, _), state) {
state must be(List("a-1", "b-2", "c-3", "d-4").reverse)
}
}
deleteProbe.expectMsgType[DeleteSnapshots]
// recover processor from replayed messages (all snapshots deleted)
val processor2 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor))
processor2 ! Recover(toSequenceNr = 4)
expectMsg("a-1")
expectMsg("b-2")
expectMsg("c-3")
expectMsg("d-4")
}
}
}

View file

@ -34,7 +34,7 @@ public class ProcessorFailureExample {
@Override
public void preRestart(Throwable reason, Option<Object> message) {
if (message.isDefined() && message.get() instanceof Persistent) {
deleteMessage((Persistent) message.get());
deleteMessage(((Persistent) message.get()).sequenceNr(), false);
}
super.preRestart(reason, message);
}

View file

@ -20,7 +20,7 @@ object ProcessorFailureExample extends App {
override def preRestart(reason: Throwable, message: Option[Any]) {
message match {
case Some(p: Persistent) if !recoveryRunning deleteMessage(p) // mark failing message as deleted
case Some(p: Persistent) if !recoveryRunning deleteMessage(p.sequenceNr) // mark failing message as deleted
case _ // ignore
}
super.preRestart(reason, message)