From 33c7f6bb4fea4ad30ca4d4d7f3bc864ba8e8812f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 27 Jun 2014 11:51:58 +0200 Subject: [PATCH] !per Change for journal plugin compatibility * A few more adjustments, found when verifying source compatibility of a few journal plugins * Note that PersistentId will be removed with deleteMessage and we should not break plugins because of that * Add missing section of at-least-once delivery in migration guide (cherry picked from commit 6727eac6d07280d277968e2e25db44e02be3b102) Conflicts: akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala --- .../persistence/PersistencePluginDocTest.java | 2 +- ...e-persistence-experimental-2.3.x-2.4.x.rst | 55 +++++++++---------- .../PersistencePluginDocSpec.scala | 2 +- .../journal/japi/AsyncWritePlugin.java | 2 +- .../journal/japi/SyncWritePlugin.java | 2 +- .../akka/persistence/JournalProtocol.scala | 4 +- .../scala/akka/persistence/Persistent.scala | 40 +++++++------- .../akka/persistence/PersistentChannel.scala | 6 +- .../scala/akka/persistence/Processor.scala | 2 +- .../scala/akka/persistence/Snapshot.scala | 15 ++++- .../journal/AsyncWriteJournal.scala | 2 +- .../persistence/journal/AsyncWriteProxy.scala | 4 +- .../journal/SyncWriteJournal.scala | 2 +- .../journal/inmem/InmemJournal.scala | 2 +- .../journal/japi/AsyncWriteJournal.scala | 2 +- .../journal/japi/SyncWriteJournal.scala | 2 +- .../journal/leveldb/LeveldbStore.scala | 2 +- .../journal/chaos/ChaosJournal.scala | 6 +- .../doc/LambdaPersistencePluginDocTest.java | 2 +- .../sample/persistence/SnapshotExample.java | 1 - 20 files changed, 81 insertions(+), 74 deletions(-) diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index 82dc3e9c42..fd4f826124 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -87,7 +87,7 @@ public class PersistencePluginDocTest { } @Override - public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { + public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { return null; } diff --git a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst index 377d4ae38c..e584415d18 100644 --- a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst @@ -27,6 +27,8 @@ To extend ``PersistentActor``:: /*...*/ } +Read more about the persistent actor in the :ref:`documentation for Scala ` and +:ref:`documentation for Java `. Changed processorId to (abstract) persistenceId =============================================== @@ -36,7 +38,7 @@ Persistent messages, as well as processors implemented the ``processorId`` metho This concept remains the same in Akka ``2.3.4``, yet we rename ``processorId`` to ``persistenceId`` because Processors will be removed, and persistent messages can be used from different classes not only ``PersistentActor`` (Views, directly from Journals etc). -Please note that ``processorId`` is **abstract** in the new API classes (``PersistentActor`` and ``PersistentView``), +Please note that ``persistenceId`` is **abstract** in the new API classes (``PersistentActor`` and ``PersistentView``), and we do **not** provide a default (actor-path derrived) value for it like we did for ``processorId``. The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical "which persistent entity this actor represents". A longer discussion on this subject can be found on `issue #15436 `_ on github. @@ -49,32 +51,6 @@ implement it yourself either as a helper trait or simply by overriding ``persist We provided the renamed method also on already deprecated classes (Channels), so you can simply apply a global rename of ``processorId`` to ``persistenceId``. -Plugin APIs: Renamed PersistentId to PersistenceId -================================================== -Following the removal of Processors and moving to ``persistenceId``, the plugin SPI visible type has changed. -The move from ``2.3.3`` to ``2.3.4`` should be relatively painless, and plugins will work even when using the deprecated ``PersistentId`` type. - -Change your implementations from:: - - def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit] = // ... - - def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit] = { - val p = messageIds.head.processorId // old - // ... - } - -to:: - - def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit] = // ... - - def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Future[Unit] = { - val p = messageIds.head.persistenceId // new - // ... - } - -Plugins written for ``2.3.3`` are source level compatible with ``2.3.4``, using the deprecated types, but will not work with future releases. -Plugin maintainers are asked to update their plugins to ``2.3.4`` as soon as possible. - Removed Processor in favour of extending PersistentActor with persistAsync ========================================================================== @@ -110,7 +86,7 @@ Replacement code, with the same semantics, using PersistentActor:: persistAsync(cmd) { e => sender() ! e } } - def receiveEvent = { + def receiveRecover = { case _ => // logic for handling replay } } @@ -118,7 +94,7 @@ Replacement code, with the same semantics, using PersistentActor:: It is worth pointing out that using ``sender()`` inside the persistAsync callback block is **valid**, and does *not* suffer any of the problems Futures have when closing over the sender reference. -Using the``PersistentActor`` instead of ``Processor`` also shifts the responsibility of deciding if a message should be persisted +Using the ``PersistentActor`` instead of ``Processor`` also shifts the responsibility of deciding if a message should be persisted to the receiver instead of the sender of the message. Previously, using ``Processor``, clients would have to wrap messages as ``Persistent(cmd)`` manually, as well as have to be aware of the receiver being a ``Processor``, which didn't play well with transparency of the ActorRefs in general. @@ -172,3 +148,24 @@ You should update it to extend ``PersistentView`` instead:: In case you need to obtain the current sequence number the view is looking at, you can use the ``lastSequenceNr`` method. It is equivalent to "current sequence number", when ``isPersistent`` returns true, otherwise it yields the sequence number of the last persistent message that this view was updated with. + +Removed Channel and PersistentChannel in favour of AtLeastOnceDelivery trait +============================================================================ + +One of the primary tasks of a ``Channel`` was to de-duplicate messages that were sent from a +``Processor`` during recovery. Performing external side effects during recovery is not +encouraged with event sourcing and therefore the ``Channel`` is not needed for this purpose. + +The ``Channel`` and ``PersistentChannel`` also performed at-least-once delivery of messages, +but it did not free a sending actor from implementing retransmission or timeouts, since the +acknowledgement from the channel is needed to guarantee safe hand-off. Therefore at-least-once +delivery is provided in a new ``AtLeastOnceDelivery`` trait that is mixed-in to the +persistent actor on the sending side. + +Read more about at-least-once delivery in the :ref:`documentation for Scala ` and +:ref:`documentation for Java `. + + + + + \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index 7a32f42010..98e8a5443e 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -126,7 +126,7 @@ trait SharedLeveldbPluginDocSpec { class MyJournal extends AsyncWriteJournal { def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = ??? def asyncWriteConfirmations(confirmations: Seq[PersistentConfirmation]): Future[Unit] = ??? - def asyncDeleteMessages(messageIds: Seq[PersistenceId], permanent: Boolean): Future[Unit] = ??? + def asyncDeleteMessages(messageIds: Seq[PersistentId], permanent: Boolean): Future[Unit] = ??? def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ??? def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = ??? def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = ??? diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java index 5dc163785b..00aa6376b4 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java @@ -32,7 +32,7 @@ interface AsyncWritePlugin { * * @deprecated doAsyncDeleteMessages will be removed (since 2.3.4) */ - @Deprecated Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent); + @Deprecated Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent); /** * Java API, Plugin API: synchronously deletes all persistent messages up to diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java index 610fadfa61..fcaad3fd25 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java @@ -30,7 +30,7 @@ interface SyncWritePlugin { * * @deprecated doDeleteMessages will be removed (since 2.3.4) */ - @Deprecated void doDeleteMessages(Iterable messageIds, boolean permanent); + @Deprecated void doDeleteMessages(Iterable messageIds, boolean permanent); /** * Java API, Plugin API: synchronously deletes all persistent messages up to diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 7b8ac204bd..1438c0afc4 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -18,12 +18,12 @@ private[persistence] object JournalProtocol { * Request to delete messages identified by `messageIds`. If `permanent` is set to `false`, * the persistent messages are marked as deleted, otherwise they are permanently deleted. */ - final case class DeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean, requestor: Option[ActorRef] = None) + final case class DeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean, requestor: Option[ActorRef] = None) /** * Reply message to a successful [[DeleteMessages]] request. */ - final case class DeleteMessagesSuccess(messageIds: immutable.Seq[PersistenceId]) + final case class DeleteMessagesSuccess(messageIds: immutable.Seq[PersistentId]) /** * Reply message to a failed [[DeleteMessages]] request. diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index ade35a8c85..5b47692d79 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -130,7 +130,7 @@ object ConfirmablePersistent { * During recovery, they are also replayed individually. */ @deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") -case class PersistentBatch(batch: immutable.Seq[Resequenceable]) extends Message +final case class PersistentBatch(batch: immutable.Seq[Resequenceable]) extends Message @deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") object PersistentBatch { @@ -155,26 +155,19 @@ trait PersistentConfirmation { /** * Plugin API: persistent message identifier. - * - * Deprecated, please use [[PersistenceId]]. */ -@deprecated("Use PersistenceId instead.", since = "2.3.4") -trait PersistentId extends PersistenceId { - /** - * Persistent id that journals a persistent message - */ - @deprecated("Use `persistenceId` instead.", since = "2.3.4") - def processorId: String = persistenceId -} +@deprecated("deleteMessages will be removed.", since = "2.3.4") +trait PersistentId { -/** - * Plugin API: persistent message identifier. - */ -trait PersistenceId { /** * Persistent id that journals a persistent message */ - def persistenceId: String + def processorId: String + + /** + * Persistent id that journals a persistent message + */ + def persistenceId: String = processorId /** * A persistent message's sequence number. @@ -185,7 +178,8 @@ trait PersistenceId { /** * INTERNAL API. */ -private[persistence] final case class PersistenceIdImpl(persistenceId: String, sequenceNr: Long) extends PersistenceId +@deprecated("deleteMessages will be removed.", since = "2.3.4") +private[persistence] final case class PersistentIdImpl(processorId: String, sequenceNr: Long) extends PersistentId /** * Plugin API: representation of a persistent message in the journal plugin API. @@ -194,7 +188,7 @@ private[persistence] final case class PersistenceIdImpl(persistenceId: String, s * @see [[journal.AsyncWriteJournal]] * @see [[journal.AsyncRecovery]] */ -trait PersistentRepr extends Persistent with Resequenceable with PersistenceId with Message { +trait PersistentRepr extends Persistent with Resequenceable with PersistentId with Message { // todo we want to get rid of the Persistent() wrapper from user land; PersistentRepr is here to stay. #15230 import scala.collection.JavaConverters._ @@ -313,7 +307,7 @@ object PersistentRepr { private[persistence] final case class PersistentImpl( payload: Any, sequenceNr: Long, - @deprecatedName('processorId) persistenceId: String, + @deprecatedName('processorId) override val persistenceId: String, deleted: Boolean, confirms: immutable.Seq[String], sender: ActorRef) extends Persistent with PersistentRepr { @@ -339,6 +333,9 @@ private[persistence] final case class PersistentImpl( val confirmable: Boolean = false val confirmMessage: Delivered = null val confirmTarget: ActorRef = null + + @deprecated("Use persistenceId.", since = "2.3.4") + override def processorId = persistenceId } /** @@ -348,7 +345,7 @@ private[persistence] final case class PersistentImpl( private[persistence] final case class ConfirmablePersistentImpl( payload: Any, sequenceNr: Long, - @deprecatedName('processorId) persistenceId: String, + @deprecatedName('processorId) override val persistenceId: String, deleted: Boolean, redeliveries: Int, confirms: immutable.Seq[String], @@ -369,6 +366,9 @@ private[persistence] final case class ConfirmablePersistentImpl( def update(sequenceNr: Long, @deprecatedName('processorId) persistenceId: String, deleted: Boolean, redeliveries: Int, confirms: immutable.Seq[String], confirmMessage: Delivered, confirmTarget: ActorRef, sender: ActorRef) = copy(sequenceNr = sequenceNr, persistenceId = persistenceId, deleted = deleted, redeliveries = redeliveries, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender) + + @deprecated("Use persistenceId.", since = "2.3.4") + override def processorId = persistenceId } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala index f3077e75b0..c286ad40b5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala @@ -205,9 +205,11 @@ final case class DeliveredByPersistentChannel( channelId: String, persistentSequenceNr: Long, deliverySequenceNr: Long = 0L, - channel: ActorRef = null) extends Delivered with PersistenceId { + channel: ActorRef = null) extends Delivered with PersistentId { - def persistenceId: String = channelId + override def persistenceId: String = channelId + @deprecated("Use persistenceId.", since = "2.3.4") + override def processorId = persistenceId def sequenceNr: Long = persistentSequenceNr def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByPersistentChannel = copy(deliverySequenceNr = deliverySequenceNr, channel = channel) diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 60f141b814..7d1261fd7d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -223,7 +223,7 @@ private[akka] trait ProcessorImpl extends Actor with Recovery { */ @deprecated("deleteMessage(sequenceNr) will be removed. Instead, validate before persist, and use deleteMessages for pruning.", since = "2.3.4") def deleteMessage(sequenceNr: Long, permanent: Boolean): Unit = { - journal ! DeleteMessages(List(PersistenceIdImpl(persistenceId, sequenceNr)), permanent) + journal ! DeleteMessages(List(PersistentIdImpl(persistenceId, sequenceNr)), permanent) } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala index 7bf341effc..f9ce6dbf3a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala @@ -13,7 +13,10 @@ package akka.persistence * @param timestamp time at which the snapshot was saved. */ @SerialVersionUID(1L) //#snapshot-metadata -final case class SnapshotMetadata(@deprecatedName('processorId) persistenceId: String, sequenceNr: Long, timestamp: Long = 0L) +final case class SnapshotMetadata(@deprecatedName('processorId) persistenceId: String, sequenceNr: Long, timestamp: Long = 0L) { + @deprecated("Use persistenceId instead.", since = "2.3.4") + def processorId: String = persistenceId +} //#snapshot-metadata /** @@ -120,7 +123,10 @@ private[persistence] object SnapshotProtocol { * @param criteria criteria for selecting a snapshot from which recovery should start. * @param toSequenceNr upper sequence number bound (inclusive) for recovery. */ - final case class LoadSnapshot(@deprecatedName('processorId) persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) + final case class LoadSnapshot(@deprecatedName('processorId) persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) { + @deprecated("Use persistenceId instead.", since = "2.3.4") + def processorId: String = persistenceId + } /** * Response message to a [[LoadSnapshot]] message. @@ -150,5 +156,8 @@ private[persistence] object SnapshotProtocol { * @param persistenceId persistent actor id. * @param criteria criteria for selecting snapshots to be deleted. */ - final case class DeleteSnapshots(@deprecatedName('processorId) persistenceId: String, criteria: SnapshotSelectionCriteria) + final case class DeleteSnapshots(@deprecatedName('processorId) persistenceId: String, criteria: SnapshotSelectionCriteria) { + @deprecated("Use persistenceId instead.", since = "2.3.4") + def processorId: String = persistenceId + } } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 1af1f96046..a97eb5ead4 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -105,7 +105,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * deleted, otherwise they are permanently deleted. */ @deprecated("asyncDeleteMessages will be removed.", since = "2.3.4") - def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Future[Unit] + def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit] /** * Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr` diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala index ba1b4ee0ba..4d506ebdfc 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -43,7 +43,7 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash def asyncWriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]): Future[Unit] = (store ? WriteConfirmations(confirmations)).mapTo[Unit] - def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Future[Unit] = + def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit] = (store ? DeleteMessages(messageIds, permanent)).mapTo[Unit] def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = @@ -78,7 +78,7 @@ private[persistence] object AsyncWriteTarget { final case class WriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) @SerialVersionUID(1L) - final case class DeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) + final case class DeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) @SerialVersionUID(1L) final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index 8d82483a68..ec9a74a23f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -97,7 +97,7 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * deleted, otherwise they are permanently deleted. */ @deprecated("deleteMessages will be removed.", since = "2.3.4") - def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Unit + def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Unit /** * Plugin API: synchronously deletes all persistent messages up to `toSequenceNr` diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 8f378955bf..2541892a07 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -34,7 +34,7 @@ private[persistence] class InmemJournal extends AsyncWriteProxy { * INTERNAL API. */ private[persistence] trait InmemMessages { - // persistentActorId -> persistent message + // persistenceId -> persistent message var messages = Map.empty[String, Vector[PersistentRepr]] def add(p: PersistentRepr) = messages = messages + (messages.get(p.persistenceId) match { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala index e33ba02d2d..166ec51570 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala @@ -22,7 +22,7 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w final def asyncWriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) = doAsyncWriteConfirmations(confirmations.asJava).map(Unit.unbox) - final def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) = + final def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) = doAsyncDeleteMessages(messageIds.asJava, permanent).map(Unit.unbox) final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala index 033eb0be7c..b89c85e07c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala @@ -20,7 +20,7 @@ abstract class SyncWriteJournal extends AsyncRecovery with SSyncWriteJournal wit final def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) = doWriteConfirmations(confirmations.asJava) - final def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) = + final def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) = doDeleteMessages(messageIds.asJava, permanent) final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index 533e500dc5..3283ab3b88 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -46,7 +46,7 @@ private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) = withBatch(batch ⇒ confirmations.foreach(confirmation ⇒ addToConfirmationBatch(confirmation, batch))) - def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) = withBatch { batch ⇒ + def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) = withBatch { batch ⇒ messageIds foreach { messageId ⇒ if (permanent) batch.delete(keyToBytes(Key(numericId(messageId.persistenceId), messageId.sequenceNr, 0))) else batch.put(keyToBytes(deletionKey(numericId(messageId.persistenceId), messageId.sequenceNr)), Array.emptyByteArray) diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala index e41ca93fdb..cab45a07f8 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala @@ -24,7 +24,7 @@ class ReplayFailedException(ps: Seq[PersistentRepr]) class ReadHighestFailedException extends TestException(s"recovery failed when reading highest sequence number") -class DeleteFailedException(messageIds: immutable.Seq[PersistenceId]) +class DeleteFailedException(messageIds: immutable.Seq[PersistentId]) extends TestException(s"delete failed for message ids = [${messageIds}]") /** @@ -53,13 +53,13 @@ class ChaosJournal extends SyncWriteJournal { if (shouldFail(confirmFailureRate)) throw new ConfirmFailedException(confirmations) else confirmations.foreach(cnf ⇒ update(cnf.persistenceId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms))) - def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Unit = + def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Unit = if (shouldFail(deleteFailureRate)) throw new DeleteFailedException(messageIds) else if (permanent) messageIds.foreach(mid ⇒ update(mid.persistenceId, mid.sequenceNr)(_.update(deleted = true))) else messageIds.foreach(mid ⇒ del(mid.persistenceId, mid.sequenceNr)) def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = - (1L to toSequenceNr).map(PersistenceIdImpl(persistenceId, _)) + (1L to toSequenceNr).map(PersistentIdImpl(persistenceId, _)) def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Unit] = if (shouldFail(replayFailureRate)) { diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java index d3cd3998c8..0aa29404c6 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java @@ -87,7 +87,7 @@ public class LambdaPersistencePluginDocTest { } @Override - public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { + public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { return null; } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java index 212deee006..58113b85b5 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/SnapshotExample.java @@ -9,7 +9,6 @@ import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.pf.ReceiveBuilder; import akka.persistence.AbstractPersistentActor; -import akka.persistence.Persistent; import akka.persistence.SnapshotOffer; import scala.PartialFunction; import scala.runtime.BoxedUnit;