!per #17799 Remove support for non-permanent deletes

* The permanent flag in deleteMessages
* old records stored with deletion flag are still not
  replayed
This commit is contained in:
Patrik Nordwall 2015-06-25 07:44:52 +02:00
parent 8c47e01e9d
commit 6eea0ddae6
26 changed files with 65 additions and 104 deletions

View file

@ -90,7 +90,7 @@ public class PersistencePluginDocTest {
}
@Override
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
return null;
}

View file

@ -323,11 +323,6 @@ Message deletion
To delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors may call the ``deleteMessages`` method.
An optional ``permanent`` parameter specifies whether the message shall be permanently
deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions
to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging
purposes, for example.
.. _persistent-views-java-lambda:
Views

View file

@ -326,11 +326,6 @@ Message deletion
To delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors may call the ``deleteMessages`` method.
An optional ``permanent`` parameter specifies whether the message shall be permanently
deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions
to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging
purposes, for example.
.. _persistent-views-java:
Persistent Views

View file

@ -317,6 +317,12 @@ The ``persist`` method that takes a ``Seq`` (Scala) or ``Iterable`` (Java) of ev
renamed to ``persistAll`` to avoid mistakes of persisting other collection types as one single event by calling
the overloaded ``persist(event)`` method.
non-permanent deletion
----------------------
The ``permanent`` flag in ``deleteMessages`` was removed. non-permanent deletes are not supported
any more.
Persistence Plugin APIs
=======================
@ -423,3 +429,9 @@ asyncReplayMessages Java API
The signature of `asyncReplayMessages` in the Java API changed from ``akka.japi.Procedure``
to ``java.util.function.Consumer``.
asyncDeleteMessagesTo
---------------------
The ``permanent`` deletion flag was removed. Support for non-permanent deletions was
removed.

View file

@ -128,8 +128,7 @@ trait SharedLeveldbPluginDocSpec {
class MyJournal extends AsyncWriteJournal {
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = ???
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long,
permanent: Boolean): Future[Unit] = ???
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = ???
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long,
toSequenceNr: Long, max: Long)(
replayCallback: (PersistentRepr) => Unit): Future[Unit] = ???

View file

@ -315,11 +315,6 @@ Message deletion
To delete all messages (journaled by a single persistent actor) up to a specified sequence number,
persistent actors may call the ``deleteMessages`` method.
An optional ``permanent`` parameter specifies whether the message shall be permanently
deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions
to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging
purposes, for example.
.. _persistent-views:
Persistent Views

View file

@ -130,7 +130,7 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
receiverProbe.expectMsg(ReplayMessagesSuccess)
}
"not replay permanently deleted messages (range deletion)" in {
val cmd = DeleteMessagesTo(pid, 3, true)
val cmd = DeleteMessagesTo(pid, 3)
val sub = TestProbe()
subscribe[DeleteMessagesTo](sub.ref)
@ -140,20 +140,6 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
List(4, 5) foreach { i receiverProbe.expectMsg(replayedMessage(i)) }
}
"replay logically deleted messages with deleted field set to true (range deletion)" in {
val cmd = DeleteMessagesTo(pid, 3, false)
val sub = TestProbe()
subscribe[DeleteMessagesTo](sub.ref)
journal ! cmd
sub.expectMsg(cmd)
journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref, replayDeleted = true)
(1 to 5).foreach {
case i @ (1 | 2 | 3) receiverProbe.expectMsg(replayedMessage(i, deleted = true))
case i @ (4 | 5) receiverProbe.expectMsg(replayedMessage(i))
}
}
"return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty" in {
journal ! ReadHighestSequenceNr(3L, pid, receiverProbe.ref)

View file

@ -60,11 +60,10 @@ interface AsyncWritePlugin {
/**
* Java API, Plugin API: synchronously deletes all persistent messages up to
* `toSequenceNr`. If `permanent` is set to `false`, the persistent messages
* are marked as deleted, otherwise they are permanently deleted.
* `toSequenceNr`.
*
* @see AsyncRecoveryPlugin
*/
Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent);
Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr);
//#async-write-plugin-api
}

View file

@ -58,11 +58,10 @@ interface SyncWritePlugin {
/**
* Java API, Plugin API: synchronously deletes all persistent messages up to
* `toSequenceNr`. If `permanent` is set to `false`, the persistent messages
* are marked as deleted, otherwise they are permanently deleted.
* `toSequenceNr`.
*
* @see AsyncRecoveryPlugin
*/
void doDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent);
void doDeleteMessagesTo(String persistenceId, long toSequenceNr);
//#sync-write-plugin-api
}

View file

@ -9,7 +9,7 @@ message PersistentMessage {
optional PersistentPayload payload = 1;
optional int64 sequenceNr = 2;
optional string persistenceId = 3;
optional bool deleted = 4;
optional bool deleted = 4; // not used in new records from 2.4
// optional int32 redeliveries = 6; // Removed in 2.4
// repeated string confirms = 7; // Removed in 2.4
// optional bool confirmable = 8; // Removed in 2.4

View file

@ -378,21 +378,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
*
* @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
*/
def deleteMessages(toSequenceNr: Long): Unit = {
deleteMessages(toSequenceNr, permanent = true)
}
/**
* Deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. If `permanent`
* is set to `false`, the persistent messages are marked as deleted in the journal, otherwise
* they permanently deleted from the journal.
*
* @param toSequenceNr upper sequence number bound of persistent messages to be deleted.
* @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted.
*/
def deleteMessages(toSequenceNr: Long, permanent: Boolean): Unit = {
journal ! DeleteMessagesTo(persistenceId, toSequenceNr, permanent)
}
def deleteMessages(toSequenceNr: Long): Unit =
deleteMessages(toSequenceNr)
/**
* Returns `true` if this persistent actor is currently recovering.

View file

@ -30,10 +30,9 @@ private[persistence] object JournalProtocol {
/**
* Request to delete all persistent messages with sequence numbers up to `toSequenceNr`
* (inclusive). If `permanent` is set to `false`, the persistent messages are marked
* as deleted in the journal, otherwise they are permanently deleted from the journal.
* (inclusive).
*/
final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean)
final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long)
extends Request
/**
@ -107,10 +106,9 @@ private[persistence] object JournalProtocol {
* @param max maximum number of messages to be replayed.
* @param persistenceId requesting persistent actor id.
* @param persistentActor requesting persistent actor.
* @param replayDeleted `true` if messages marked as deleted shall be replayed.
*/
final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, persistenceId: String, persistentActor: ActorRef, replayDeleted: Boolean = false)
extends Request
final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long,
persistenceId: String, persistentActor: ActorRef) extends Request
/**
* Reply message to a [[ReplayMessages]] request. A separate reply is sent to the requestor for each

View file

@ -82,7 +82,9 @@ trait PersistentRepr extends Message {
def withManifest(manifest: String): PersistentRepr
/**
* `true` if this message is marked as deleted.
* Not used in new records stored with Akka v2.4, but
* old records from v2.3 may have this as `true` if
* it was a non-permanent delete.
*/
def deleted: Boolean

View file

@ -92,11 +92,11 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
}
}
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted)
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor)
// Send replayed messages and replay result to persistentActor directly. No need
// to resequence replayed messages relative to written and looped messages.
asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p
if (!p.deleted || replayDeleted)
if (!p.deleted) // old records from 2.3 may still have the deleted flag
adaptFromJournal(p).foreach { adaptedPersistentRepr
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
}
@ -117,8 +117,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
case e ReadHighestSequenceNrFailure(e)
} pipeTo persistentActor
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent)
asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete {
case d @ DeleteMessagesTo(persistenceId, toSequenceNr)
asyncDeleteMessagesTo(persistenceId, toSequenceNr) onComplete {
case Success(_) if (publish) context.system.eventStream.publish(d)
case Failure(e)
}
@ -165,10 +165,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
/**
* Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr`
* (inclusive). If `permanent` is set to `false`, the persistent messages are marked
* as deleted, otherwise they are permanently deleted.
* (inclusive).
*/
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit]
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
/**
* Plugin API

View file

@ -43,8 +43,8 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
(store ? WriteMessages(messages)).mapTo[immutable.Seq[Try[Unit]]]
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] =
(store ? DeleteMessagesTo(persistenceId, toSequenceNr, permanent)).mapTo[Unit]
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] =
(store ? DeleteMessagesTo(persistenceId, toSequenceNr)).mapTo[Unit]
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr Unit): Future[Unit] = {
val replayCompletionPromise = Promise[Unit]()
@ -72,7 +72,7 @@ private[persistence] object AsyncWriteTarget {
final case class WriteMessages(messages: immutable.Seq[AtomicWrite])
@SerialVersionUID(1L)
final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean)
final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long)
@SerialVersionUID(1L)
final case class ReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)

View file

@ -69,9 +69,9 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery wi
throw e
}
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted)
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor)
asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p
if (!p.deleted || replayDeleted)
if (!p.deleted) // old records from 2.3 may still have the deleted flag
adaptFromJournal(p).foreach { adaptedPersistentRepr
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), adaptedPersistentRepr.sender)
}
@ -90,8 +90,8 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery wi
case e ReadHighestSequenceNrFailure(e)
} pipeTo persistentActor
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent)
Try(deleteMessagesTo(persistenceId, toSequenceNr, permanent)) match {
case d @ DeleteMessagesTo(persistenceId, toSequenceNr)
Try(deleteMessagesTo(persistenceId, toSequenceNr)) match {
case Success(_) if (publish) context.system.eventStream.publish(d)
case Failure(e)
}
@ -137,9 +137,8 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery wi
/**
* Plugin API: synchronously deletes all persistent messages up to `toSequenceNr`
* (inclusive). If `permanent` is set to `false`, the persistent messages are marked
* as deleted, otherwise they are permanently deleted.
* (inclusive).
*/
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit
//#journal-plugin-api
}

View file

@ -82,9 +82,7 @@ private[persistence] class InmemStore extends Actor with InmemMessages with Writ
Try(a.payload.foreach(add))
}
sender() ! results
case DeleteMessagesTo(pid, tsnr, false)
sender() ! (1L to tsnr foreach { snr update(pid, snr)(_.update(deleted = true)) })
case DeleteMessagesTo(pid, tsnr, true)
case DeleteMessagesTo(pid, tsnr)
sender() ! (1L to tsnr foreach { snr delete(pid, snr) })
case ReplayMessages(pid, fromSnr, toSnr, max)
read(pid, fromSnr, toSnr, max).foreach { sender() ! _ }

View file

@ -28,6 +28,6 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w
}(collection.breakOut)
}
final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) =
doAsyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent).map(Unit.unbox)
final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long) =
doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).map(Unit.unbox)
}

View file

@ -26,6 +26,6 @@ abstract class SyncWriteJournal extends AsyncRecovery with SSyncWriteJournal wit
else successUnit
}(collection.breakOut)
final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit =
doDeleteMessagesTo(persistenceId, toSequenceNr, permanent)
final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit =
doDeleteMessagesTo(persistenceId, toSequenceNr)
}

View file

@ -47,7 +47,7 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb
val msg = persistentFromBytes(nextEntry.getValue)
val del = deletion(iter, nextKey)
if (ctr < max) {
replayCallback(msg.update(deleted = del))
if (!del) replayCallback(msg)
go(iter, nextKey, ctr + 1L, replayCallback)
}
}

View file

@ -44,7 +44,7 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
Try(a.payload.foreach(message addToMessageBatch(message, batch)))
})
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = withBatch { batch
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long) = withBatch { batch
val nid = numericId(persistenceId)
// seek to first existing message
@ -55,8 +55,7 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
}
fromSequenceNr to toSequenceNr foreach { sequenceNr
if (permanent) batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) // TODO: delete deletion markers, if any.
else batch.put(keyToBytes(deletionKey(nid, sequenceNr)), Array.emptyByteArray)
batch.delete(keyToBytes(Key(nid, sequenceNr, 0)))
}
}
@ -114,7 +113,7 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le
def receive = {
case WriteMessages(msgs) sender() ! writeMessages(preparePersistentBatch(msgs))
case DeleteMessagesTo(pid, tsnr, permanent) sender() ! deleteMessagesTo(pid, tsnr, permanent)
case DeleteMessagesTo(pid, tsnr) sender() ! deleteMessagesTo(pid, tsnr)
case ReadHighestSequenceNr(pid, fromSequenceNr) sender() ! readHighestSequenceNr(numericId(pid))
case ReplayMessages(pid, fromSnr, toSnr, max)
Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(p adaptFromJournal(p).foreach { sender() ! _ })) match {

View file

@ -133,7 +133,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef]))
builder.setSequenceNr(persistent.sequenceNr)
builder.setDeleted(persistent.deleted)
// deleted is not used in new records from 2.4
builder
}
@ -174,7 +174,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
persistentMessage.getSequenceNr,
if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined,
if (persistentMessage.hasManifest) persistentMessage.getManifest else Undefined,
persistentMessage.getDeleted,
if (persistentMessage.hasDeleted) persistentMessage.getDeleted else false,
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender)
}

View file

@ -283,9 +283,9 @@ abstract class PersistentViewSpec(config: Config) extends PersistenceSpec(config
viewProbe.expectMsg("replicated-c-3")
viewProbe.expectMsg("replicated-d-4")
replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _, _) }
replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _, _) }
replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _, _) }
replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _) }
replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _) }
replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _) }
}
"support context.become" in {
view = system.actorOf(Props(classOf[BecomingPersistentView], name, viewProbe.ref))

View file

@ -46,10 +46,9 @@ class ChaosJournal extends SyncWriteJournal {
SyncWriteJournal.successUnit
}
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = {
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit = {
(1L to toSequenceNr).foreach { snr
if (permanent) update(persistenceId, snr)(_.update(deleted = true))
else del(persistenceId, snr)
del(persistenceId, snr)
}
}

View file

@ -145,7 +145,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
"A message serializer" when {
"not given a manifest" must {
"handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("a"), 13, "p1", "", true)
val persistent = PersistentRepr(MyPayload("a"), 13, "p1", "")
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
@ -157,7 +157,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
"given a PersistentRepr manifest" must {
"handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", "", true)
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", "")
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
@ -169,7 +169,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
"given payload serializer with string manifest" must {
"handle serialization" in {
val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", "", true)
val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", "")
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
@ -192,7 +192,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
}
"be able to deserialize data when class is removed" in {
val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", "", true))
val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", ""))
// It was created with:
// val old = PersistentRepr(OldPayload('A'), 13, "p1", true, testActor)

View file

@ -83,7 +83,7 @@ public class LambdaPersistencePluginDocTest {
}
@Override
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) {
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
return null;
}