diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index 5d4d57aeef..8540e33c15 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -90,7 +90,7 @@ public class PersistencePluginDocTest { } @Override - public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { + public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { return null; } diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index cd1987df6a..af84bc61a8 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -323,11 +323,6 @@ Message deletion To delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. -An optional ``permanent`` parameter specifies whether the message shall be permanently -deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions -to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging -purposes, for example. - .. _persistent-views-java-lambda: Views diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 119c08b171..d7710d8604 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -326,11 +326,6 @@ Message deletion To delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. -An optional ``permanent`` parameter specifies whether the message shall be permanently -deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions -to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging -purposes, for example. - .. _persistent-views-java: Persistent Views diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 64dab14be4..e0577a65ae 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -317,6 +317,12 @@ The ``persist`` method that takes a ``Seq`` (Scala) or ``Iterable`` (Java) of ev renamed to ``persistAll`` to avoid mistakes of persisting other collection types as one single event by calling the overloaded ``persist(event)`` method. +non-permanent deletion +---------------------- + +The ``permanent`` flag in ``deleteMessages`` was removed. non-permanent deletes are not supported +any more. + Persistence Plugin APIs ======================= @@ -423,3 +429,9 @@ asyncReplayMessages Java API The signature of `asyncReplayMessages` in the Java API changed from ``akka.japi.Procedure`` to ``java.util.function.Consumer``. +asyncDeleteMessagesTo +--------------------- + +The ``permanent`` deletion flag was removed. Support for non-permanent deletions was +removed. + diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index b25b51225d..b0b12ab6b8 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -128,8 +128,7 @@ trait SharedLeveldbPluginDocSpec { class MyJournal extends AsyncWriteJournal { def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = ??? - def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, - permanent: Boolean): Future[Unit] = ??? + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = ??? def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( replayCallback: (PersistentRepr) => Unit): Future[Unit] = ??? diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 598f88fbec..d19bab7a91 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -315,11 +315,6 @@ Message deletion To delete all messages (journaled by a single persistent actor) up to a specified sequence number, persistent actors may call the ``deleteMessages`` method. -An optional ``permanent`` parameter specifies whether the message shall be permanently -deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions -to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging -purposes, for example. - .. _persistent-views: Persistent Views diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 28c0b9c5c3..39a649b408 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -130,7 +130,7 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { receiverProbe.expectMsg(ReplayMessagesSuccess) } "not replay permanently deleted messages (range deletion)" in { - val cmd = DeleteMessagesTo(pid, 3, true) + val cmd = DeleteMessagesTo(pid, 3) val sub = TestProbe() subscribe[DeleteMessagesTo](sub.ref) @@ -140,20 +140,6 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) { journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref) List(4, 5) foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) } } - "replay logically deleted messages with deleted field set to true (range deletion)" in { - val cmd = DeleteMessagesTo(pid, 3, false) - val sub = TestProbe() - - subscribe[DeleteMessagesTo](sub.ref) - journal ! cmd - sub.expectMsg(cmd) - - journal ! ReplayMessages(1, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref, replayDeleted = true) - (1 to 5).foreach { - case i @ (1 | 2 | 3) ⇒ receiverProbe.expectMsg(replayedMessage(i, deleted = true)) - case i @ (4 | 5) ⇒ receiverProbe.expectMsg(replayedMessage(i)) - } - } "return a highest stored sequence number > 0 if the persistent actor has already written messages and the message log is non-empty" in { journal ! ReadHighestSequenceNr(3L, pid, receiverProbe.ref) diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java index 4080d53fe4..0da067fd87 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java @@ -60,11 +60,10 @@ interface AsyncWritePlugin { /** * Java API, Plugin API: synchronously deletes all persistent messages up to - * `toSequenceNr`. If `permanent` is set to `false`, the persistent messages - * are marked as deleted, otherwise they are permanently deleted. + * `toSequenceNr`. * * @see AsyncRecoveryPlugin */ - Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent); + Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr); //#async-write-plugin-api } diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java index 11033ef76a..964b94ade7 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java @@ -58,11 +58,10 @@ interface SyncWritePlugin { /** * Java API, Plugin API: synchronously deletes all persistent messages up to - * `toSequenceNr`. If `permanent` is set to `false`, the persistent messages - * are marked as deleted, otherwise they are permanently deleted. + * `toSequenceNr`. * * @see AsyncRecoveryPlugin */ - void doDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent); + void doDeleteMessagesTo(String persistenceId, long toSequenceNr); //#sync-write-plugin-api } diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index 72f96ab9d2..34e4e0f577 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -9,7 +9,7 @@ message PersistentMessage { optional PersistentPayload payload = 1; optional int64 sequenceNr = 2; optional string persistenceId = 3; - optional bool deleted = 4; + optional bool deleted = 4; // not used in new records from 2.4 // optional int32 redeliveries = 6; // Removed in 2.4 // repeated string confirms = 7; // Removed in 2.4 // optional bool confirmable = 8; // Removed in 2.4 diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 37aa394be4..65c07079b6 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -378,21 +378,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas * * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. */ - def deleteMessages(toSequenceNr: Long): Unit = { - deleteMessages(toSequenceNr, permanent = true) - } - - /** - * Deletes all persistent messages with sequence numbers less than or equal `toSequenceNr`. If `permanent` - * is set to `false`, the persistent messages are marked as deleted in the journal, otherwise - * they permanently deleted from the journal. - * - * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. - * @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted. - */ - def deleteMessages(toSequenceNr: Long, permanent: Boolean): Unit = { - journal ! DeleteMessagesTo(persistenceId, toSequenceNr, permanent) - } + def deleteMessages(toSequenceNr: Long): Unit = + deleteMessages(toSequenceNr) /** * Returns `true` if this persistent actor is currently recovering. diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index ce694e9b8d..f8004ba80f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -30,10 +30,9 @@ private[persistence] object JournalProtocol { /** * Request to delete all persistent messages with sequence numbers up to `toSequenceNr` - * (inclusive). If `permanent` is set to `false`, the persistent messages are marked - * as deleted in the journal, otherwise they are permanently deleted from the journal. + * (inclusive). */ - final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) + final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long) extends Request /** @@ -107,10 +106,9 @@ private[persistence] object JournalProtocol { * @param max maximum number of messages to be replayed. * @param persistenceId requesting persistent actor id. * @param persistentActor requesting persistent actor. - * @param replayDeleted `true` if messages marked as deleted shall be replayed. */ - final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, persistenceId: String, persistentActor: ActorRef, replayDeleted: Boolean = false) - extends Request + final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, + persistenceId: String, persistentActor: ActorRef) extends Request /** * Reply message to a [[ReplayMessages]] request. A separate reply is sent to the requestor for each diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 4d5989e8b6..29919c155f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -82,7 +82,9 @@ trait PersistentRepr extends Message { def withManifest(manifest: String): PersistentRepr /** - * `true` if this message is marked as deleted. + * Not used in new records stored with Akka v2.4, but + * old records from v2.3 may have this as `true` if + * it was a non-permanent delete. */ def deleted: Boolean diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 547ac01ffe..a45fc5156b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -92,11 +92,11 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } } - case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted) ⇒ + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒ // Send replayed messages and replay result to persistentActor directly. No need // to resequence replayed messages relative to written and looped messages. asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒ - if (!p.deleted || replayDeleted) + if (!p.deleted) // old records from 2.3 may still have the deleted flag adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender) } @@ -117,8 +117,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { case e ⇒ ReadHighestSequenceNrFailure(e) } pipeTo persistentActor - case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒ - asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete { + case d @ DeleteMessagesTo(persistenceId, toSequenceNr) ⇒ + asyncDeleteMessagesTo(persistenceId, toSequenceNr) onComplete { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) case Failure(e) ⇒ } @@ -165,10 +165,9 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { /** * Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr` - * (inclusive). If `permanent` is set to `false`, the persistent messages are marked - * as deleted, otherwise they are permanently deleted. + * (inclusive). */ - def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] /** * Plugin API diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala index d2950ca41a..dba13ebe3e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -43,8 +43,8 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = (store ? WriteMessages(messages)).mapTo[immutable.Seq[Try[Unit]]] - def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = - (store ? DeleteMessagesTo(persistenceId, toSequenceNr, permanent)).mapTo[Unit] + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = + (store ? DeleteMessagesTo(persistenceId, toSequenceNr)).mapTo[Unit] def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = { val replayCompletionPromise = Promise[Unit]() @@ -72,7 +72,7 @@ private[persistence] object AsyncWriteTarget { final case class WriteMessages(messages: immutable.Seq[AtomicWrite]) @SerialVersionUID(1L) - final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) + final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long) @SerialVersionUID(1L) final case class ReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index 4519de45fe..0cb27a91fe 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -69,9 +69,9 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery wi throw e } - case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted) ⇒ + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒ asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒ - if (!p.deleted || replayDeleted) + if (!p.deleted) // old records from 2.3 may still have the deleted flag adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), adaptedPersistentRepr.sender) } @@ -90,8 +90,8 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery wi case e ⇒ ReadHighestSequenceNrFailure(e) } pipeTo persistentActor - case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒ - Try(deleteMessagesTo(persistenceId, toSequenceNr, permanent)) match { + case d @ DeleteMessagesTo(persistenceId, toSequenceNr) ⇒ + Try(deleteMessagesTo(persistenceId, toSequenceNr)) match { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) case Failure(e) ⇒ } @@ -137,9 +137,8 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery wi /** * Plugin API: synchronously deletes all persistent messages up to `toSequenceNr` - * (inclusive). If `permanent` is set to `false`, the persistent messages are marked - * as deleted, otherwise they are permanently deleted. + * (inclusive). */ - def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit + def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit //#journal-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 90375d8aee..7418d55a95 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -82,9 +82,7 @@ private[persistence] class InmemStore extends Actor with InmemMessages with Writ Try(a.payload.foreach(add)) } sender() ! results - case DeleteMessagesTo(pid, tsnr, false) ⇒ - sender() ! (1L to tsnr foreach { snr ⇒ update(pid, snr)(_.update(deleted = true)) }) - case DeleteMessagesTo(pid, tsnr, true) ⇒ + case DeleteMessagesTo(pid, tsnr) ⇒ sender() ! (1L to tsnr foreach { snr ⇒ delete(pid, snr) }) case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ read(pid, fromSnr, toSnr, max).foreach { sender() ! _ } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala index a6ac24f8c7..f0507fab0f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala @@ -28,6 +28,6 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w }(collection.breakOut) } - final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = - doAsyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent).map(Unit.unbox) + final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long) = + doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).map(Unit.unbox) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala index 04e89232ca..eb1fb26016 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala @@ -26,6 +26,6 @@ abstract class SyncWriteJournal extends AsyncRecovery with SSyncWriteJournal wit else successUnit }(collection.breakOut) - final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = - doDeleteMessagesTo(persistenceId, toSequenceNr, permanent) + final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit = + doDeleteMessagesTo(persistenceId, toSequenceNr) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala index af6b2766af..1ac1af8bd6 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala @@ -47,7 +47,7 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb val msg = persistentFromBytes(nextEntry.getValue) val del = deletion(iter, nextKey) if (ctr < max) { - replayCallback(msg.update(deleted = del)) + if (!del) replayCallback(msg) go(iter, nextKey, ctr + 1L, replayCallback) } } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index 0b934e11a1..d09b21c789 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -44,7 +44,7 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with Try(a.payload.foreach(message ⇒ addToMessageBatch(message, batch))) }) - def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = withBatch { batch ⇒ + def deleteMessagesTo(persistenceId: String, toSequenceNr: Long) = withBatch { batch ⇒ val nid = numericId(persistenceId) // seek to first existing message @@ -55,8 +55,7 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with } fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒ - if (permanent) batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) // TODO: delete deletion markers, if any. - else batch.put(keyToBytes(deletionKey(nid, sequenceNr)), Array.emptyByteArray) + batch.delete(keyToBytes(Key(nid, sequenceNr, 0))) } } @@ -114,7 +113,7 @@ class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.le def receive = { case WriteMessages(msgs) ⇒ sender() ! writeMessages(preparePersistentBatch(msgs)) - case DeleteMessagesTo(pid, tsnr, permanent) ⇒ sender() ! deleteMessagesTo(pid, tsnr, permanent) + case DeleteMessagesTo(pid, tsnr) ⇒ sender() ! deleteMessagesTo(pid, tsnr) case ReadHighestSequenceNr(pid, fromSequenceNr) ⇒ sender() ! readHighestSequenceNr(numericId(pid)) case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(p ⇒ adaptFromJournal(p).foreach { sender() ! _ })) match { diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index bbbf9d9cb2..63a2238ae3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -133,7 +133,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef])) builder.setSequenceNr(persistent.sequenceNr) - builder.setDeleted(persistent.deleted) + // deleted is not used in new records from 2.4 builder } @@ -174,7 +174,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer persistentMessage.getSequenceNr, if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined, if (persistentMessage.hasManifest) persistentMessage.getManifest else Undefined, - persistentMessage.getDeleted, + if (persistentMessage.hasDeleted) persistentMessage.getDeleted else false, if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender) } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala index b84bc636a3..a16b26680b 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala @@ -283,9 +283,9 @@ abstract class PersistentViewSpec(config: Config) extends PersistenceSpec(config viewProbe.expectMsg("replicated-c-3") viewProbe.expectMsg("replicated-d-4") - replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _, _) ⇒ } - replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _, _) ⇒ } - replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _, _) ⇒ } + replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _) ⇒ } + replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _) ⇒ } + replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _) ⇒ } } "support context.become" in { view = system.actorOf(Props(classOf[BecomingPersistentView], name, viewProbe.ref)) diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala index 915c633775..66dad14957 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala @@ -46,10 +46,9 @@ class ChaosJournal extends SyncWriteJournal { SyncWriteJournal.successUnit } - def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = { + def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit = { (1L to toSequenceNr).foreach { snr ⇒ - if (permanent) update(persistenceId, snr)(_.update(deleted = true)) - else del(persistenceId, snr) + del(persistenceId, snr) } } diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 009a16005c..237878c3de 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -145,7 +145,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { "A message serializer" when { "not given a manifest" must { "handle custom Persistent message serialization" in { - val persistent = PersistentRepr(MyPayload("a"), 13, "p1", "", true) + val persistent = PersistentRepr(MyPayload("a"), 13, "p1", "") val serializer = serialization.findSerializerFor(persistent) val bytes = serializer.toBinary(persistent) @@ -157,7 +157,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { "given a PersistentRepr manifest" must { "handle custom Persistent message serialization" in { - val persistent = PersistentRepr(MyPayload("b"), 13, "p1", "", true) + val persistent = PersistentRepr(MyPayload("b"), 13, "p1", "") val serializer = serialization.findSerializerFor(persistent) val bytes = serializer.toBinary(persistent) @@ -169,7 +169,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { "given payload serializer with string manifest" must { "handle serialization" in { - val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", "", true) + val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", "") val serializer = serialization.findSerializerFor(persistent) val bytes = serializer.toBinary(persistent) @@ -192,7 +192,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { } "be able to deserialize data when class is removed" in { - val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", "", true)) + val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", "")) // It was created with: // val old = PersistentRepr(OldPayload('A'), 13, "p1", true, testActor) diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java index 3d8506de38..dd70b39d8e 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java @@ -83,7 +83,7 @@ public class LambdaPersistencePluginDocTest { } @Override - public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { + public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { return null; }