diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 29e89353aa..e770db5d19 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -164,10 +164,11 @@ private[akka] trait StashSupport { } /** - * Prepends `others` to this stash. + * Prepends `others` to this stash. This method is optimized for a large stash and + * small `others`. */ private[akka] def prepend(others: immutable.Seq[Envelope]): Unit = - others.reverseIterator.foreach(env ⇒ theStash = env +: theStash) + theStash = others.foldRight(theStash)((e, s) ⇒ e +: s) /** * Prepends the oldest message in the stash to the mailbox, and then removes that diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 7f1a1f7c35..5ea08d247c 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -101,7 +101,7 @@ public class PersistenceDocTest { @Override public void preRestart(Throwable reason, Option message) { if (message.isDefined() && message.get() instanceof Persistent) { - deleteMessage((Persistent) message.get()); + deleteMessage(((Persistent) message.get()).sequenceNr()); } super.preRestart(reason, message); } diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index 5d1c04582d..9bd3428e24 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -32,6 +32,10 @@ public class PersistencePluginDocTest { @Override public void doDelete(SnapshotMetadata metadata) throws Exception { } + + @Override + public void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception { + } } class MyAsyncJournal extends AsyncWriteJournal { @@ -51,7 +55,7 @@ public class PersistencePluginDocTest { } @Override - public Future doDeleteAsync(String processorId, long sequenceNr, boolean physical) { + public Future doDeleteAsync(String processorId, long fromSequenceNr, long toSequenceNr, boolean permanent) { return null; } diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 76f1a19132..83179dd628 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -143,11 +143,12 @@ a replay of that message during recovery it can be deleted. Message deletion ---------------- -A processor can delete messages by calling the ``delete`` method with a ``Persistent`` message object or a -sequence number as argument. An optional ``physical`` parameter specifies whether the message shall be -physically deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. -Later extensions to Akka persistence will allow to replay messages that have been marked as deleted which can -be useful for debugging purposes, for example. +A processor can delete a single message by calling the ``deleteMessage`` method with the sequence number of +that message as argument. An optional ``permanent`` parameter specifies whether the message shall be permanently +deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions +to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging +purposes, for example. To delete all messages (journaled by a single processor) up to a specified sequence number, +processors can call the ``deleteMessages`` method. Identifiers ----------- @@ -315,6 +316,13 @@ If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which s To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. +Snapshot deletion +----------------- + +A processor can delete a single snapshot by calling the ``deleteSnapshot`` method with the sequence number and the +timestamp of the snapshot as argument. To bulk-delete snapshots that match a specified ``SnapshotSelectionCriteria`` +argument, processors can call the ``deleteSnapshots`` method. + .. _event-sourcing-java: Event sourcing diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index beb23f9203..abac9ec6d3 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -67,7 +67,7 @@ trait PersistenceDocSpec { //#deletion override def preRestart(reason: Throwable, message: Option[Any]) { message match { - case Some(p: Persistent) ⇒ deleteMessage(p) + case Some(p: Persistent) ⇒ deleteMessage(p.sequenceNr) case _ ⇒ } super.preRestart(reason, message) diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index 6f975f0e29..7380c59b1c 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -71,7 +71,7 @@ class PersistencePluginDocSpec extends WordSpec { class MyJournal extends AsyncWriteJournal { def writeAsync(persistent: PersistentRepr): Future[Unit] = ??? def writeBatchAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ??? - def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit] = ??? + def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ??? def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ??? def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Long] = ??? } @@ -79,6 +79,7 @@ class MyJournal extends AsyncWriteJournal { class MySnapshotStore extends SnapshotStore { def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ??? def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ??? - def saved(metadata: SnapshotMetadata) {} - def delete(metadata: SnapshotMetadata) {} + def saved(metadata: SnapshotMetadata): Unit = ??? + def delete(metadata: SnapshotMetadata): Unit = ??? + def delete(processorId: String, criteria: SnapshotSelectionCriteria): Unit = ??? } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index d36e71c7c0..738d77be31 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -138,11 +138,12 @@ a replay of that message during recovery it can be deleted. Message deletion ---------------- -A processor can delete messages by calling the ``delete`` method with a ``Persistent`` message object or a -sequence number as argument. An optional ``physical`` parameter specifies whether the message shall be -physically deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. -Later extensions to Akka persistence will allow to replay messages that have been marked as deleted which can -be useful for debugging purposes, for example. +A processor can delete a single message by calling the ``deleteMessage`` method with the sequence number of +that message as argument. An optional ``permanent`` parameter specifies whether the message shall be permanently +deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions +to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging +purposes, for example. To delete all messages (journaled by a single processor) up to a specified sequence number, +processors can call the ``deleteMessages`` method. Identifiers ----------- @@ -326,6 +327,13 @@ If not specified, they default to ``SnapshotSelectionCriteria.Latest`` which sel To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.None``. A recovery where no saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. +Snapshot deletion +----------------- + +A processor can delete a single snapshot by calling the ``deleteSnapshot`` method with the sequence number and the +timestamp of the snapshot as argument. To bulk-delete snapshots that match a specified ``SnapshotSelectionCriteria`` +argument, processors can call the ``deleteSnapshots`` method. + .. _event-sourcing: Event sourcing diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncReplayPlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncReplayPlugin.java index e8927e75fe..099f277532 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncReplayPlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncReplayPlugin.java @@ -28,7 +28,7 @@ interface AsyncReplayPlugin { * message must be contained in that message's `confirms` sequence. * * @param processorId processor id. - * @param fromSequenceNr sequence number where replay should start. + * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param replayCallback called to replay a single message. Can be called from any * thread. diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java index cc6424b38f..713c09886d 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java @@ -23,11 +23,14 @@ interface AsyncWritePlugin { Future doWriteBatchAsync(Iterable persistentBatch); /** - * Java API, Plugin API: asynchronously deletes a persistent message. If `physical` - * is set to `false`, the persistent message is marked as deleted, otherwise it is - * physically deleted. + * Java API, Plugin API: asynchronously deletes all persistent messages within the + * range from `fromSequenceNr` to `toSequenceNr`. If `permanent` is set to `false`, + * the persistent messages are marked as deleted, otherwise they are permanently + * deleted. + * + * @see AsyncReplayPlugin */ - Future doDeleteAsync(String processorId, long sequenceNr, boolean physical); + Future doDeleteAsync(String processorId, long fromSequenceNr, long toSequenceNr, boolean permanent); /** * Java API, Plugin API: asynchronously writes a delivery confirmation to the diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java index 4ee4e85169..09665ff23a 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java @@ -21,11 +21,14 @@ interface SyncWritePlugin { void doWriteBatch(Iterable persistentBatch); /** - * Java API, Plugin API: synchronously deletes a persistent message. If `physical` - * is set to `false`, the persistent message is marked as deleted, otherwise it is - * physically deleted. + * Java API, Plugin API: synchronously deletes all persistent messages within the + * range from `fromSequenceNr` to `toSequenceNr`. If `permanent` is set to `false`, + * the persistent messages are marked as deleted, otherwise they are permanently + * deleted. + * + * @see AsyncReplayPlugin */ - void doDelete(String processorId, long sequenceNr, boolean physical); + void doDelete(String processorId, long fromSequenceNr, long toSequenceNr, boolean permanent); /** * Java API, Plugin API: synchronously writes a delivery confirmation to the journal. diff --git a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java index 9b844c90df..7c76b51af8 100644 --- a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java @@ -40,5 +40,13 @@ interface SnapshotStorePlugin { * @param metadata snapshot metadata. */ void doDelete(SnapshotMetadata metadata) throws Exception; + + /** + * Java API, Plugin API: deletes all snapshots matching `criteria`. + * + * @param processorId processor id. + * @param criteria selection criteria for deleting. + */ + void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception; //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index d74d51ffce..dc4d5dcd20 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -64,11 +64,11 @@ private[persistence] trait Eventsourced extends Processor { private val persistingEvents: State = new State { def aroundReceive(receive: Receive, message: Any) = message match { case PersistentBatch(b) ⇒ { - b.foreach(p ⇒ deleteMessage(p, true)) + b.foreach(p ⇒ deleteMessage(p.sequenceNr, true)) throw new UnsupportedOperationException("Persistent command batches not supported") } case p: PersistentRepr ⇒ { - deleteMessage(p, true) + deleteMessage(p.sequenceNr, true) throw new UnsupportedOperationException("Persistent commands not supported") } case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) ⇒ { diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 014b10d67d..c3cee50399 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -13,12 +13,12 @@ import akka.actor._ */ private[persistence] object JournalProtocol { /** - * Instructs a journal to delete a persistent message identified by `processorId` - * and `sequenceNr`. If `physical` is set to `false`, the persistent message is - * marked as deleted in the journal, otherwise it is physically deleted from the - * journal. + * Instructs a journal to delete all persistent messages with sequence numbers in + * the range from `fromSequenceNr` to `toSequenceNr` (both inclusive). If `permanent` + * is set to `false`, the persistent messages are marked as deleted in the journal, + * otherwise they are permanently deleted from the journal. */ - case class Delete(processorId: String, sequenceNr: Long, physical: Boolean) + case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) /** * Instructs a journal to persist a sequence of messages. diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 7685b8ae4e..0483674888 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -4,8 +4,6 @@ package akka.persistence -import com.typesafe.config.Config - import akka.actor._ import akka.dispatch.Dispatchers import akka.persistence.journal.AsyncWriteJournal @@ -35,6 +33,13 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private val journal = createPlugin("journal", clazz ⇒ if (classOf[AsyncWriteJournal].isAssignableFrom(clazz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId) + private[persistence] val publishPluginCommands: Boolean = { + val path = "publish-plugin-commands" + // this config option is only used internally (for testing + // purposes) and is therefore not defined in reference.conf + config.hasPath(path) && config.getBoolean(path) + } + /** * Returns a snapshot store for a processor identified by `processorId`. */ @@ -69,7 +74,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { val pluginClassName = pluginConfig.getString("class") val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass) - system.actorOf(Props(pluginClass).withDispatcher(pluginDispatcherId)) + system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(pluginClass).withDispatcher(pluginDispatcherId), pluginType) } private def id(ref: ActorRef) = ref.path.toStringWithAddress(system.provider.getDefaultAddress) diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 460d70e2ee..76b80f8a1c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -235,54 +235,57 @@ trait Processor extends Actor with Stash with StashFactory { implicit def currentPersistentMessage: Option[Persistent] = Option(_currentPersistent) /** - * Marks the `persistent` message as deleted. A message marked as deleted is not replayed during - * recovery. This method is usually called inside `preRestartProcessor` when a persistent message - * caused an exception. Processors that want to re-receive that persistent message during recovery - * should not call this method. + * Marks a persistent message, identified by `sequenceNr`, as deleted. A message marked as deleted is + * not replayed during recovery. This method is usually called inside `preRestartProcessor` when a + * persistent message caused an exception. Processors that want to re-receive that persistent message + * during recovery should not call this method. * - * @param persistent persistent message to be marked as deleted. - * @throws IllegalArgumentException if `persistent` message has not been persisted by this - * processor. + * @param sequenceNr sequence number of the persistent message to be deleted. */ - def deleteMessage(persistent: Persistent): Unit = { - deleteMessage(persistent, false) + def deleteMessage(sequenceNr: Long): Unit = { + deleteMessage(sequenceNr, false) } /** - * Deletes a `persistent` message. If `physical` is set to `false` (default), the persistent - * message is marked as deleted in the journal, otherwise it is physically deleted from the - * journal. A deleted message is not replayed during recovery. This method is usually called - * inside `preRestartProcessor` when a persistent message caused an exception. Processors that - * want to re-receive that persistent message during recovery should not call this method. - * - * @param persistent persistent message to be deleted. - * @param physical if `false` (default), the message is marked as deleted, otherwise it is - * physically deleted. - * @throws IllegalArgumentException if `persistent` message has not been persisted by this - * processor. - */ - def deleteMessage(persistent: Persistent, physical: Boolean): Unit = { - val impl = persistent.asInstanceOf[PersistentRepr] - if (impl.processorId != processorId) - throw new IllegalArgumentException( - s"persistent message to be deleted (processor id = [${impl.processorId}], sequence number = [${impl.sequenceNr}]) " + - s"has not been persisted by this processor (processor id = [${processorId}])") - else deleteMessage(impl.sequenceNr, physical) - } - - /** - * Deletes a persistent message identified by `sequenceNr`. If `physical` is set to `false`, - * the persistent message is marked as deleted in the journal, otherwise it is physically + * Deletes a persistent message identified by `sequenceNr`. If `permanent` is set to `false`, + * the persistent message is marked as deleted in the journal, otherwise it is permanently * deleted from the journal. A deleted message is not replayed during recovery. This method * is usually called inside `preRestartProcessor` when a persistent message caused an exception. * Processors that want to re-receive that persistent message during recovery should not call * this method. * + * Later extensions may also allow a replay of messages that have been marked as deleted which can + * be useful in debugging environments. + * * @param sequenceNr sequence number of the persistent message to be deleted. - * @param physical if `false`, the message is marked as deleted, otherwise it is physically deleted. + * @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted. */ - def deleteMessage(sequenceNr: Long, physical: Boolean): Unit = { - journal ! Delete(processorId, sequenceNr, physical) + def deleteMessage(sequenceNr: Long, permanent: Boolean): Unit = { + journal ! Delete(processorId, sequenceNr, sequenceNr, permanent) + } + + /** + * Marks all persistent messages with sequence numbers less than or equal `toSequenceNr` as deleted. + * + * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. + */ + def deleteMessages(toSequenceNr: Long): Unit = { + deleteMessages(toSequenceNr, false) + } + + /** + * Deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. If `permanent` + * is set to `false`, the persistent messages are marked as deleted in the journal, otherwise + * they permanently deleted from the journal. + * + * Later extensions may also allow a replay of messages that have been marked as deleted which can + * be useful in debugging environments. + * + * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. + * @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted. + */ + def deleteMessages(toSequenceNr: Long, permanent: Boolean): Unit = { + journal ! Delete(processorId, 1L, toSequenceNr, permanent) } /** @@ -293,6 +296,20 @@ trait Processor extends Actor with Stash with StashFactory { snapshotStore ! SaveSnapshot(SnapshotMetadata(processorId, lastSequenceNr), snapshot) } + /** + * Deletes a snapshot identified by `sequenceNr` and `timestamp`. + */ + def deleteSnapshot(sequenceNr: Long, timestamp: Long): Unit = { + snapshotStore ! DeleteSnapshot(SnapshotMetadata(processorId, sequenceNr, timestamp)) + } + + /** + * Deletes all snapshots matching `criteria`. + */ + def deleteSnapshots(criteria: SnapshotSelectionCriteria): Unit = { + snapshotStore ! DeleteSnapshots(processorId, criteria) + } + /** * INTERNAL API. */ diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala index 725ae45d7a..25c5cd2278 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala @@ -41,7 +41,7 @@ case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable) case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any) /** - * Selection criteria for loading snapshots. + * Selection criteria for loading and deleting snapshots. * * @param maxSequenceNr upper bound for a selected snapshot's sequence number. Default is no upper bound. * @param maxTimestamp upper bound for a selected snapshot's timestamp. Default is no upper bound. @@ -52,6 +52,9 @@ case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any) case class SnapshotSelectionCriteria(maxSequenceNr: Long = Long.MaxValue, maxTimestamp: Long = Long.MaxValue) { private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria = if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this + + private[persistence] def matches(metadata: SnapshotMetadata): Boolean = + metadata.sequenceNr <= maxSequenceNr && metadata.timestamp <= maxTimestamp } object SnapshotSelectionCriteria { @@ -125,4 +128,19 @@ private[persistence] object SnapshotProtocol { * @param snapshot snapshot. */ case class SaveSnapshot(metadata: SnapshotMetadata, snapshot: Any) + + /** + * Instructs snapshot store to delete a snapshot. + * + * @param metadata snapshot metadata. + */ + case class DeleteSnapshot(metadata: SnapshotMetadata) + + /** + * Instructs snapshot store to delete all snapshots that match `criteria`. + * + * @param processorId processor id. + * @param criteria criteria for selecting snapshots to be deleted. + */ + case class DeleteSnapshots(processorId: String, criteria: SnapshotSelectionCriteria) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncReplay.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncReplay.scala index ea022b9abb..43488be6a2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncReplay.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncReplay.scala @@ -29,7 +29,7 @@ trait AsyncReplay { * message must be contained in that message's `confirms` sequence. * * @param processorId processor id. - * @param fromSequenceNr sequence number where replay should start. + * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param replayCallback called to replay a single message. Can be called from any * thread. diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index d86123b705..c7a8488c49 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -21,6 +21,8 @@ trait AsyncWriteJournal extends Actor with AsyncReplay { import AsyncWriteJournal._ import context.dispatcher + private val extension = Persistence(context.system) + private val resequencer = context.actorOf(Props[Resequencer]) private var resequencerCounter = 1L @@ -62,14 +64,14 @@ trait AsyncWriteJournal extends Actor with AsyncReplay { } case c @ Confirm(processorId, sequenceNr, channelId) ⇒ { confirmAsync(processorId, sequenceNr, channelId) onComplete { - case Success(_) ⇒ context.system.eventStream.publish(c) + case Success(_) ⇒ if (extension.publishPluginCommands) context.system.eventStream.publish(c) case Failure(e) ⇒ // TODO: publish failure to event stream } context.system.eventStream.publish(c) } - case d @ Delete(processorId, sequenceNr, physical) ⇒ { - deleteAsync(processorId, sequenceNr, physical) onComplete { - case Success(_) ⇒ context.system.eventStream.publish(d) + case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent) ⇒ { + deleteAsync(processorId, fromSequenceNr, toSequenceNr, permanent) onComplete { + case Success(_) ⇒ if (extension.publishPluginCommands) context.system.eventStream.publish(d) case Failure(e) ⇒ // TODO: publish failure to event stream } } @@ -93,11 +95,14 @@ trait AsyncWriteJournal extends Actor with AsyncReplay { def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] /** - * Plugin API: asynchronously deletes a persistent message. If `physical` is set to - * `false`, the persistent message is marked as deleted, otherwise it is physically - * deleted. + * Plugin API: asynchronously deletes all persistent messages within the range from + * `fromSequenceNr` to `toSequenceNr` (both inclusive). If `permanent` is set to + * `false`, the persistent messages are marked as deleted, otherwise they are + * permanently deleted. + * + * @see [[AsyncReplay]] */ - def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit] + def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] /** * Plugin API: asynchronously writes a delivery confirmation to the journal. diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index 77ca00ca81..3b667523f3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -47,11 +47,11 @@ trait SyncWriteJournal extends Actor with AsyncReplay { } case c @ Confirm(processorId, sequenceNr, channelId) ⇒ { confirm(processorId, sequenceNr, channelId) - context.system.eventStream.publish(c) // TODO: turn off by default and allow to turn on by configuration + if (extension.publishPluginCommands) context.system.eventStream.publish(c) } - case d @ Delete(processorId, sequenceNr, physical) ⇒ { - delete(processorId, sequenceNr, physical) - context.system.eventStream.publish(d) // TODO: turn off by default and allow to turn on by configuration + case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent) ⇒ { + delete(processorId, fromSequenceNr, toSequenceNr, permanent) + if (extension.publishPluginCommands) context.system.eventStream.publish(d) } case Loop(message, processor) ⇒ { processor forward LoopSuccess(message) @@ -72,11 +72,14 @@ trait SyncWriteJournal extends Actor with AsyncReplay { def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]): Unit /** - * Plugin API: synchronously deletes a persistent message. If `physical` is set to - * `false`, the persistent message is marked as deleted, otherwise it is physically - * deleted. + * Plugin API: synchronously deletes all persistent messages within the range from + * `fromSequenceNr` to `toSequenceNr` (both inclusive). If `permanent` is set to + * `false`, the persistent messages are marked as deleted, otherwise they are + * permanently deleted. + * + * @see [[AsyncReplay]] */ - def delete(processorId: String, sequenceNr: Long, physical: Boolean): Unit + def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Unit /** * Plugin API: synchronously writes a delivery confirmation to the journal. diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 92317b45ee..68b39f66aa 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -33,8 +33,8 @@ private[persistence] class InmemJournal extends AsyncWriteJournal { def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] = (store ? WriteBatch(persistentBatch)).mapTo[Unit] - def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit] = - (store ? Delete(processorId, sequenceNr, physical)).mapTo[Unit] + def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = + (store ? Delete(processorId, fromSequenceNr, toSequenceNr, permanent)).mapTo[Unit] def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = (store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit] @@ -56,11 +56,11 @@ private[persistence] class InmemStore extends Actor { case WriteBatch(pb) ⇒ pb.foreach(add) success() - case Delete(pid, snr, false) ⇒ - update(pid, snr)(_.update(deleted = true)) + case Delete(pid, fsnr, tsnr, false) ⇒ + fsnr to tsnr foreach { snr ⇒ update(pid, snr)(_.update(deleted = true)) } success() - case Delete(pid, snr, true) ⇒ - delete(pid, snr) + case Delete(pid, fsnr, tsnr, true) ⇒ + fsnr to tsnr foreach { snr ⇒ delete(pid, snr) } success() case Confirm(pid, snr, cid) ⇒ update(pid, snr)(p ⇒ p.update(confirms = cid +: p.confirms)) @@ -106,7 +106,7 @@ private[persistence] class InmemStore extends Actor { private[persistence] object InmemStore { case class Write(p: PersistentRepr) case class WriteBatch(pb: Seq[PersistentRepr]) - case class Delete(processorId: String, sequenceNr: Long, physical: Boolean) + case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) case class Confirm(processorId: String, sequenceNr: Long, channelId: String) case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentRepr) ⇒ Unit) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala index f158c0fbfb..23a9dc08de 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala @@ -22,8 +22,8 @@ abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal wit final def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]) = doWriteBatchAsync(persistentBatch.asJava).map(Unit.unbox) - final def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean) = - doDeleteAsync(processorId, sequenceNr, physical).map(Unit.unbox) + final def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = + doDeleteAsync(processorId, fromSequenceNr, toSequenceNr, permanent).map(Unit.unbox) final def confirmAsync(processorId: String, sequenceNr: Long, channelId: String) = doConfirmAsync(processorId, sequenceNr, channelId).map(Unit.unbox) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala index 43f91f19d9..55e0fdbe05 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala @@ -20,8 +20,8 @@ abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal with final def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) = doWriteBatch(persistentBatch.asJava) - final def delete(processorId: String, sequenceNr: Long, physical: Boolean) = - doDelete(processorId, sequenceNr, physical) + final def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = + doDelete(processorId, fromSequenceNr, toSequenceNr, permanent) final def confirm(processorId: String, sequenceNr: Long, channelId: String) = doConfirm(processorId, sequenceNr, channelId) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 6e8404df89..13bb43ad13 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -45,12 +45,14 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) = withBatch(batch ⇒ persistentBatch.foreach(persistent ⇒ addToBatch(persistent, batch))) - def delete(processorId: String, sequenceNr: Long, physical: Boolean) { - if (physical) - // TODO: delete confirmations and deletion markers, if any. - leveldb.delete(keyToBytes(Key(numericId(processorId), sequenceNr, 0))) - else - leveldb.put(keyToBytes(deletionKey(numericId(processorId), sequenceNr)), Array.empty[Byte]) + def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = withBatch { batch ⇒ + val nid = numericId(processorId) + if (permanent) fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒ + batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) // TODO: delete confirmations and deletion markers, if any. + } + else fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒ + batch.put(keyToBytes(deletionKey(nid, sequenceNr)), Array.empty[Byte]) + } } def confirm(processorId: String, sequenceNr: Long, channelId: String) { diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index b3422cc5ac..55248c2858 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -18,6 +18,8 @@ trait SnapshotStore extends Actor { import SnapshotProtocol._ import context.dispatcher + private val extension = Persistence(context.system) + final def receive = { case LoadSnapshot(processorId, criteria, toSequenceNr) ⇒ { val p = sender @@ -44,6 +46,14 @@ trait SnapshotStore extends Actor { delete(metadata) sender ! evt // sender is processor } + case d @ DeleteSnapshot(metadata) ⇒ { + delete(metadata) + if (extension.publishPluginCommands) context.system.eventStream.publish(d) + } + case d @ DeleteSnapshots(processorId, criteria) ⇒ { + delete(processorId, criteria) + if (extension.publishPluginCommands) context.system.eventStream.publish(d) + } } //#snapshot-store-plugin-api @@ -75,6 +85,15 @@ trait SnapshotStore extends Actor { * * @param metadata snapshot metadata. */ + def delete(metadata: SnapshotMetadata) + + /** + * Plugin API: deletes all snapshots matching `criteria`. + * + * @param processorId processor id. + * @param criteria selection criteria for deleting. + */ + def delete(processorId: String, criteria: SnapshotSelectionCriteria) //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala index d73294dc77..e3dd34ffa7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala @@ -27,4 +27,8 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { final def delete(metadata: SnapshotMetadata) = doDelete(metadata) + + final def delete(processorId: String, criteria: SnapshotSelectionCriteria) = + doDelete(processorId: String, criteria: SnapshotSelectionCriteria) + } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index d0d27655f9..ccdf1c98d9 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -54,6 +54,13 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo snapshotFile(metadata).delete() } + def delete(processorId: String, criteria: SnapshotSelectionCriteria) = { + snapshotMetadata.get(processorId) match { + case Some(mds) ⇒ mds.filter(criteria.matches).foreach(delete) + case None ⇒ + } + } + private def load(processorId: String, criteria: SnapshotSelectionCriteria): Option[SelectedSnapshot] = { @scala.annotation.tailrec def load(metadata: SortedSet[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match { @@ -78,11 +85,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo // // TODO: make number of loading attempts configurable - for { - md ← load(metadata(processorId).filter(md ⇒ - md.sequenceNr <= criteria.maxSequenceNr && - md.timestamp <= criteria.maxTimestamp).takeRight(3)) - } yield md + for (md ← load(metadata(processorId).filter(criteria.matches).takeRight(3))) yield md } private def save(metadata: SnapshotMetadata, snapshot: Any): Unit = diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 8bb3ee3eff..90138ce8cb 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -56,6 +56,7 @@ object PersistenceSpec { s""" serialize-creators = on serialize-messages = on + akka.persistence.publish-plugin-commands = on akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}" akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec" akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}-spec/" diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala index 0b8dc56f4a..525476e078 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala @@ -23,7 +23,7 @@ object ProcessorSpec { override def preRestart(reason: Throwable, message: Option[Any]) = { message match { - case Some(m: Persistent) ⇒ deleteMessage(m) // delete message from journal + case Some(m: Persistent) ⇒ deleteMessage(m.sequenceNr) // delete message from journal case _ ⇒ // ignore } super.preRestart(reason, message) @@ -112,7 +112,7 @@ object ProcessorSpec { class LastReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) { override def preRestart(reason: Throwable, message: Option[Any]) = { message match { - case Some(m: Persistent) ⇒ if (recoveryRunning) deleteMessage(m) + case Some(m: Persistent) ⇒ if (recoveryRunning) deleteMessage(m.sequenceNr) case _ ⇒ } super.preRestart(reason, message) @@ -126,10 +126,22 @@ object ProcessorSpec { override def receive = failOnReplayedA orElse super.receive } + + case class Delete1(snr: Long) + case class DeleteN(toSnr: Long) + + class DeleteMessageTestProcessor(name: String) extends RecoverTestProcessor(name) { + override def receive = { + case Delete1(snr) ⇒ deleteMessage(snr) + case DeleteN(toSnr) ⇒ deleteMessages(toSnr) + case m ⇒ super.receive(m) + } + } } abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { import ProcessorSpec._ + import JournalProtocol._ override protected def beforeEach() { super.beforeEach() @@ -292,6 +304,40 @@ abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with Persi processor ! GetState expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5", "f-6")) } + "support single message deletions" in { + val deleteProbe = TestProbe() + + system.eventStream.subscribe(deleteProbe.ref, classOf[Delete]) + + val processor1 = namedProcessor[DeleteMessageTestProcessor] + processor1 ! Persistent("c") + processor1 ! Persistent("d") + processor1 ! Persistent("e") + processor1 ! Delete1(4) + deleteProbe.expectMsgType[Delete] + + val processor2 = namedProcessor[DeleteMessageTestProcessor] + processor2 ! GetState + + expectMsg(List("a-1", "b-2", "c-3", "e-5")) + } + "support bulk message deletions" in { + val deleteProbe = TestProbe() + + system.eventStream.subscribe(deleteProbe.ref, classOf[Delete]) + + val processor1 = namedProcessor[DeleteMessageTestProcessor] + processor1 ! Persistent("c") + processor1 ! Persistent("d") + processor1 ! Persistent("e") + processor1 ! DeleteN(4) + deleteProbe.expectMsgType[Delete] + + val processor2 = namedProcessor[DeleteMessageTestProcessor] + processor2 ! GetState + + expectMsg(List("e-5")) + } } "A processor" can { diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala index e9b9354db9..6f6976466e 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala @@ -38,7 +38,7 @@ object ProcessorStashSpec { class RecoveryFailureStashingProcessor(name: String) extends StashingProcessor(name) { override def preRestart(reason: Throwable, message: Option[Any]) = { message match { - case Some(m: Persistent) ⇒ if (recoveryRunning) deleteMessage(m) + case Some(m: Persistent) ⇒ if (recoveryRunning) deleteMessage(m.sequenceNr) case _ ⇒ } super.preRestart(reason, message) diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index f17a59243b..918b9b0f79 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -28,10 +28,22 @@ object SnapshotSpec { } override def preStart() = () } + + case class Delete1(metadata: SnapshotMetadata) + case class DeleteN(criteria: SnapshotSelectionCriteria) + + class DeleteSnapshotTestProcessor(name: String, probe: ActorRef) extends LoadSnapshotTestProcessor(name, probe) { + override def receive = { + case Delete1(metadata) ⇒ deleteSnapshot(metadata.sequenceNr, metadata.timestamp) + case DeleteN(criteria) ⇒ deleteSnapshots(criteria) + case other ⇒ super.receive(other) + } + } } class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot")) with PersistenceSpec with ImplicitSender { import SnapshotSpec._ + import SnapshotProtocol._ override protected def beforeEach() { super.beforeEach() @@ -134,5 +146,68 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot" expectMsg("b-2") expectMsg("c-3") } + "support single message deletions" in { + val deleteProbe = TestProbe() + + val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) + val processorId = name + + system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshot]) + + // recover processor from 3rd snapshot and then delete snapshot + processor1 ! Recover(toSequenceNr = 4) + processor1 ! "done" + + val metadata = expectMsgPF() { + case (md @ SnapshotMetadata(`processorId`, 4, _), state) ⇒ { + state must be(List("a-1", "b-2", "c-3", "d-4").reverse) + md + } + } + expectMsg("done") + + processor1 ! Delete1(metadata) + deleteProbe.expectMsgType[DeleteSnapshot] + + // recover processor from 2nd snapshot (3rd was deleted) plus replayed messages + val processor2 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) + + processor2 ! Recover(toSequenceNr = 4) + expectMsgPF() { + case (md @ SnapshotMetadata(`processorId`, 2, _), state) ⇒ { + state must be(List("a-1", "b-2").reverse) + md + } + } + expectMsg("c-3") + expectMsg("d-4") + } + "support bulk message deletions" in { + val deleteProbe = TestProbe() + + val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) + val processorId = name + + system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshots]) + + // recover processor and the delete first three (= all) snapshots + processor1 ! Recover(toSequenceNr = 4) + processor1 ! DeleteN(SnapshotSelectionCriteria(maxSequenceNr = 4)) + expectMsgPF() { + case (md @ SnapshotMetadata(`processorId`, 4, _), state) ⇒ { + state must be(List("a-1", "b-2", "c-3", "d-4").reverse) + } + } + deleteProbe.expectMsgType[DeleteSnapshots] + + // recover processor from replayed messages (all snapshots deleted) + val processor2 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) + + processor2 ! Recover(toSequenceNr = 4) + expectMsg("a-1") + expectMsg("b-2") + expectMsg("c-3") + expectMsg("d-4") + } } } diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java index bdf9baf2af..e960fc0f1d 100644 --- a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java +++ b/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java @@ -34,7 +34,7 @@ public class ProcessorFailureExample { @Override public void preRestart(Throwable reason, Option message) { if (message.isDefined() && message.get() instanceof Persistent) { - deleteMessage((Persistent) message.get()); + deleteMessage(((Persistent) message.get()).sequenceNr(), false); } super.preRestart(reason, message); } diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala index 8ca24dc3d9..5cbc29bdd2 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala @@ -20,7 +20,7 @@ object ProcessorFailureExample extends App { override def preRestart(reason: Throwable, message: Option[Any]) { message match { - case Some(p: Persistent) if !recoveryRunning ⇒ deleteMessage(p) // mark failing message as deleted + case Some(p: Persistent) if !recoveryRunning ⇒ deleteMessage(p.sequenceNr) // mark failing message as deleted case _ ⇒ // ignore } super.preRestart(reason, message)