From 4bb321a83a4ee9f366c300a75ec434fc17a0aca5 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Mon, 23 Jun 2014 14:33:35 +0200 Subject: [PATCH] !per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala --- .../docs/persistence/PersistenceDocTest.java | 18 +- .../persistence/PersistencePluginDocTest.java | 12 +- akka-docs/rst/java/lambda-persistence.rst | 16 +- akka-docs/rst/java/persistence.rst | 16 +- .../migration-guide-eventsourced-2.3.x.rst | 2 +- ...e-persistence-experimental-2.3.x-2.4.x.rst | 37 +++- .../docs/persistence/PersistenceDocSpec.scala | 24 +-- .../PersistencePluginDocSpec.scala | 12 +- akka-docs/rst/scala/persistence.rst | 18 +- .../journal/japi/AsyncRecoveryPlugin.java | 10 +- .../journal/japi/AsyncWritePlugin.java | 4 +- .../journal/japi/SyncWritePlugin.java | 4 +- .../serialization/MessageFormats.java | 204 +++++++++--------- .../snapshot/japi/SnapshotStorePlugin.java | 8 +- .../src/main/protobuf/MessageFormats.proto | 4 +- .../main/scala/akka/persistence/Channel.scala | 4 +- .../akka/persistence/JournalProtocol.scala | 14 +- .../scala/akka/persistence/Persistence.scala | 15 +- .../scala/akka/persistence/Persistent.scala | 50 +++-- .../akka/persistence/PersistentChannel.scala | 30 +-- .../scala/akka/persistence/Processor.scala | 30 +-- .../scala/akka/persistence/Recovery.scala | 9 +- .../scala/akka/persistence/Snapshot.scala | 12 +- .../scala/akka/persistence/Snapshotter.scala | 4 +- .../main/scala/akka/persistence/View.scala | 6 +- .../persistence/journal/AsyncRecovery.scala | 10 +- .../journal/AsyncWriteJournal.scala | 16 +- .../persistence/journal/AsyncWriteProxy.scala | 22 +- .../journal/SyncWriteJournal.scala | 16 +- .../journal/inmem/InmemJournal.scala | 16 +- .../journal/japi/AsyncRecovery.scala | 8 +- .../journal/japi/AsyncWriteJournal.scala | 6 +- .../journal/japi/SyncWriteJournal.scala | 6 +- .../journal/leveldb/LeveldbKey.scala | 10 +- .../journal/leveldb/LeveldbRecovery.scala | 22 +- .../journal/leveldb/LeveldbStore.scala | 14 +- .../main/scala/akka/persistence/package.scala | 4 +- .../serialization/MessageSerializer.scala | 28 +-- .../persistence/snapshot/SnapshotStore.scala | 12 +- .../snapshot/japi/SnapshotStore.scala | 8 +- .../snapshot/local/LocalSnapshotStore.scala | 18 +- .../scala/akka/persistence/FailureSpec.scala | 2 +- .../persistence/NumberProcessorSpec.scala | 9 +- .../akka/persistence/PerformanceSpec.scala | 14 +- .../akka/persistence/PersistenceSpec.scala | 2 +- .../persistence/PersistentChannelSpec.scala | 4 +- .../SnapshotDirectoryFailureSpec.scala | 2 +- .../SnapshotFailureRobustnessSpec.scala | 4 +- .../SnapshotSerializationSpec.scala | 4 +- .../scala/akka/persistence/SnapshotSpec.scala | 30 +-- .../scala/akka/persistence/ViewSpec.scala | 14 +- .../journal/chaos/ChaosJournal.scala | 26 +-- .../serialization/SerializerSpec.scala | 8 +- .../java/doc/LambdaPersistenceDocTest.java | 18 +- .../doc/LambdaPersistencePluginDocTest.java | 14 +- .../java/sample/persistence/ViewExample.java | 2 +- .../java/sample/persistence/ViewExample.java | 2 +- .../sample/persistence/ViewExample.scala | 3 +- 58 files changed, 502 insertions(+), 435 deletions(-) diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 4f70f830fc..60e1376338 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -22,9 +22,9 @@ public class PersistenceDocTest { public interface SomeOtherMessage {} public interface ProcessorMethods { - //#processor-id - public String processorId(); - //#processor-id + //#persistence-id + public String persistenceId(); + //#persistence-id //#recovery-status public boolean recoveryRunning(); public boolean recoveryFinished(); @@ -120,12 +120,12 @@ public class PersistenceDocTest { } class MyProcessor4 extends UntypedProcessor implements ProcessorMethods { - //#processor-id-override + //#persistence-id-override @Override - public String processorId() { - return "my-stable-processor-id"; + public String persistenceId() { + return "my-stable-persistence-id"; } - //#processor-id-override + //#persistence-id-override @Override public void onReceive(Object message) throws Exception {} } @@ -488,8 +488,8 @@ public class PersistenceDocTest { //#view class MyView extends UntypedView { @Override - public String processorId() { - return "some-processor-id"; + public String persistenceId() { + return "some-persistence-id"; } @Override diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index b40527ec3e..82dc3e9c42 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -53,7 +53,7 @@ public class PersistencePluginDocTest { class MySnapshotStore extends SnapshotStore { @Override - public Future> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria) { + public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { return null; } @@ -71,7 +71,7 @@ public class PersistencePluginDocTest { } @Override - public void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception { + public void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { } } @@ -87,22 +87,22 @@ public class PersistencePluginDocTest { } @Override - public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { + public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { return null; } @Override - public Future doAsyncDeleteMessagesTo(String processorId, long toSequenceNr, boolean permanent) { + public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { return null; } @Override - public Future doAsyncReplayMessages(String processorId, long fromSequenceNr, long toSequenceNr, long max, Procedure replayCallback) { + public Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, Procedure replayCallback) { return null; } @Override - public Future doAsyncReadHighestSequenceNr(String processorId, long fromSequenceNr) { + public Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) { return null; } } diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 4745b763e1..a8c77b5e98 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -161,30 +161,30 @@ Identifiers ----------- A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of processor's path without the address part and can be obtained via the ``processorId`` +``String`` representation of processor's path without the address part and can be obtained via the ``persistenceId`` method. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistence-id Applications can customize a processor's id by specifying an actor name during processor creation as shown in section :ref:`processors-java`. This changes that processor's name in its actor hierarchy and hence influences only -part of the processor id. To fully customize a processor's id, the ``processorId`` method must be overridden. +part of the processor id. To fully customize a processor's id, the ``persistenceId`` method must be overridden. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id-override +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistence-id-override -Overriding ``processorId`` is the recommended way to generate stable identifiers. +Overriding ``persistenceId`` is the recommended way to generate stable identifiers. .. _views-java-lambda: Views ===== -Views can be implemented by extending the ``AbstractView`` abstract class, implement the ``processorId`` method +Views can be implemented by extending the ``AbstractView`` abstract class, implement the ``persistenceId`` method and setting the “initial behavior” in the constructor by calling the :meth:`receive` method. .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view -The ``processorId`` identifies the processor from which the view receives journaled messages. It is not necessary +The ``persistenceId`` identifies the processor from which the view receives journaled messages. It is not necessary the referenced processor is actually running. Views read messages from a processor's journal directly. When a processor is started later and begins to write new messages, the corresponding view is updated automatically, by default. @@ -234,7 +234,7 @@ Applications can customize a view's id by specifying an actor name during view c name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the ``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. -The ``viewId`` must differ from the referenced ``processorId``, unless :ref:`snapshots-java` of a view and its +The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its processor shall be shared (which is what applications usually do not want). .. _channels-java-lambda: diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 804b1b1aae..b51a8c844b 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -175,30 +175,30 @@ Identifiers ----------- A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of processor's path without the address part and can be obtained via the ``processorId`` +``String`` representation of processor's path without the address part and can be obtained via the ``persistenceId`` method. -.. includecode:: code/docs/persistence/PersistenceDocTest.java#processor-id +.. includecode:: code/docs/persistence/PersistenceDocTest.java#persistence-id Applications can customize a processor's id by specifying an actor name during processor creation as shown in section :ref:`processors-java`. This changes that processor's name in its actor hierarchy and hence influences only -part of the processor id. To fully customize a processor's id, the ``processorId`` method must be overridden. +part of the processor id. To fully customize a processor's id, the ``persistenceId`` method must be overridden. -.. includecode:: code/docs/persistence/PersistenceDocTest.java#processor-id-override +.. includecode:: code/docs/persistence/PersistenceDocTest.java#persistence-id-override -Overriding ``processorId`` is the recommended way to generate stable identifiers. +Overriding ``persistenceId`` is the recommended way to generate stable identifiers. .. _views-java: Views ===== -Views can be implemented by extending the ``UntypedView`` trait and implementing the ``onReceive`` and the ``processorId`` +Views can be implemented by extending the ``UntypedView`` trait and implementing the ``onReceive`` and the ``persistenceId`` methods. .. includecode:: code/docs/persistence/PersistenceDocTest.java#view -The ``processorId`` identifies the processor from which the view receives journaled messages. It is not necessary +The ``persistenceId`` identifies the processor from which the view receives journaled messages. It is not necessary the referenced processor is actually running. Views read messages from a processor's journal directly. When a processor is started later and begins to write new messages, the corresponding view is updated automatically, by default. @@ -248,7 +248,7 @@ Applications can customize a view's id by specifying an actor name during view c name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the ``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. -The ``viewId`` must differ from the referenced ``processorId``, unless :ref:`snapshots-java` of a view and its +The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its processor shall be shared (which is what applications usually do not want). .. _channels-java: diff --git a/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst b/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst index 879c73e13b..d7a97a6d72 100644 --- a/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst +++ b/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst @@ -84,7 +84,7 @@ Processors channel in order to avoid redundant replies during replay. Sender references of type ``PromiseActorRef`` are not journaled, they are ``system.deadLetters`` on replay. - Supports :ref:`snapshots`. -- :ref:`processor-identifiers` are of type ``String``, have a default value and can be overridden by applications. +- :ref:`persistence-identifiers` are of type ``String``, have a default value and can be overridden by applications. - Supports :ref:`batch-writes`. - Supports stashing of messages. diff --git a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst index 937a378daa..ee0b86e1e7 100644 --- a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst @@ -21,7 +21,42 @@ To extend ``PersistentActor``:: class NewPersistentProcessor extends PersistentActor { /*...*/ } -No other API changes are required for this migration. + +Renamed processorId to persistenceId +==================================== +In Akka Persistence ``2.3.3`` and previously, the main building block of applications were Processors. +Persistent messages, as well as processors implemented the ``processorId`` method to identify which persistent entity a message belonged to. + +This concept remains the same in Akka ``2.3.4``, yet we rename ``processorId`` to ``persistenceId`` because Processors will be removed, +and persistent messages can be used from different classes not only ``PersistentActor`` (Views, directly from Journals etc). + +We provided the renamed method also on already deprecated classes (Channels), so you can simply apply a global rename of ``processorId`` to ``persistenceId``. + +Plugin APIs: Renamed PersistentId to PersistenceId +================================================== +Following the removal of Processors and moving to ``persistenceId``, the plugin SPI visible type has changed. +The move from ``2.3.3`` to ``2.3.4`` should be relatively painless, and plugins will work even when using the deprecated ``PersistentId`` type. + +Change your implementations from:: + + def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit] = // ... + + def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit] = { + val p = messageIds.head.processorId // old + // ... + } + +to:: + + def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit] = // ... + + def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Future[Unit] = { + val p = messageIds.head.persistenceId // new + // ... + } + +Plugins written for ``2.3.3`` are source level compatible with ``2.3.4``, using the deprecated types, but will not work with future releases. +Plugin maintainers are asked to update their plugins to ``2.3.4`` as soon as possible. Removed Processor in favour of extending PersistentActor with persistAsync ========================================================================== diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 525471355d..42515eeb92 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -111,9 +111,9 @@ trait PersistenceDocSpec { new AnyRef { trait ProcessorMethods { - //#processor-id - def processorId: String - //#processor-id + //#persistence-id + def persistenceId: String + //#persistence-id //#recovery-status def recoveryRunning: Boolean def recoveryFinished: Boolean @@ -123,9 +123,9 @@ trait PersistenceDocSpec { //#current-message } class MyProcessor1 extends Processor with ProcessorMethods { - //#processor-id-override - override def processorId = "my-stable-processor-id" - //#processor-id-override + //#persistence-id-override + override def persistenceId = "my-stable-persistence-id" + //#persistence-id-override def receive = { case _ => } @@ -411,7 +411,7 @@ trait PersistenceDocSpec { //#view class MyView extends View { - def processorId: String = "some-processor-id" + override def persistenceId: String = "some-persistence-id" def receive: Actor.Receive = { case Persistent(payload, sequenceNr) => // ... @@ -440,26 +440,26 @@ trait PersistenceDocSpec { val materializer = FlowMaterializer(MaterializerSettings()) - val flow: Flow[Persistent] = PersistentFlow.fromProcessor("some-processor-id") + val flow: Flow[Persistent] = PersistentFlow.fromPersistence("some-persistence-id") val producer: Producer[Persistent] = flow.toProducer(materializer) //#producer-creation //#producer-buffer-size - PersistentFlow.fromProcessor("some-processor-id", PersistentPublisherSettings(maxBufferSize = 200)) + PersistentFlow.fromPersistence("some-persistence-id", PersistentPublisherSettings(maxBufferSize = 200)) //#producer-buffer-size //#producer-examples // 1 producer and 2 consumers: val producer1: Producer[Persistent] = - PersistentFlow.fromProcessor("processor-1").toProducer(materializer) + PersistentFlow.fromPersistence("processor-1").toProducer(materializer) Flow(producer1).foreach(p => println(s"consumer-1: ${p.payload}")).consume(materializer) Flow(producer1).foreach(p => println(s"consumer-2: ${p.payload}")).consume(materializer) // 2 producers (merged) and 1 consumer: val producer2: Producer[Persistent] = - PersistentFlow.fromProcessor("processor-2").toProducer(materializer) + PersistentFlow.fromPersistence("processor-2").toProducer(materializer) val producer3: Producer[Persistent] = - PersistentFlow.fromProcessor("processor-3").toProducer(materializer) + PersistentFlow.fromPersistence("processor-3").toProducer(materializer) Flow(producer2).merge(producer3).foreach { p => println(s"consumer-3: ${p.payload}") }.consume(materializer) diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index 7419f5e8af..7a32f42010 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -126,16 +126,16 @@ trait SharedLeveldbPluginDocSpec { class MyJournal extends AsyncWriteJournal { def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = ??? def asyncWriteConfirmations(confirmations: Seq[PersistentConfirmation]): Future[Unit] = ??? - def asyncDeleteMessages(messageIds: Seq[PersistentId], permanent: Boolean): Future[Unit] = ??? - def asyncDeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ??? - def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = ??? - def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] = ??? + def asyncDeleteMessages(messageIds: Seq[PersistenceId], permanent: Boolean): Future[Unit] = ??? + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ??? + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = ??? + def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = ??? } class MySnapshotStore extends SnapshotStore { - def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ??? + def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ??? def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ??? def saved(metadata: SnapshotMetadata): Unit = ??? def delete(metadata: SnapshotMetadata): Unit = ??? - def delete(processorId: String, criteria: SnapshotSelectionCriteria): Unit = ??? + def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = ??? } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 2fa7805d86..f3323dbb43 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -165,30 +165,30 @@ Identifiers ----------- A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of processor's path without the address part and can be obtained via the ``processorId`` +``String`` representation of processor's path without the address part and can be obtained via the ``persistenceId`` method. -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#processor-id +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistence-id Applications can customize a processor's id by specifying an actor name during processor creation as shown in section :ref:`processors`. This changes that processor's name in its actor hierarchy and hence influences only -part of the processor id. To fully customize a processor's id, the ``processorId`` method must be overridden. +part of the processor id. To fully customize a processor's id, the ``persistenceId`` method must be overridden. -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#processor-id-override +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistence-id-override -Overriding ``processorId`` is the recommended way to generate stable identifiers. +Overriding ``persistenceId`` is the recommended way to generate stable identifiers. .. _views: Views ===== -Views can be implemented by extending the ``View`` trait and implementing the ``receive`` and the ``processorId`` +Views can be implemented by extending the ``View`` trait and implementing the ``receive`` and the ``persistenceId`` methods. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#view -The ``processorId`` identifies the processor from which the view receives journaled messages. It is not necessary +The ``persistenceId`` identifies the processor from which the view receives journaled messages. It is not necessary the referenced processor is actually running. Views read messages from a processor's journal directly. When a processor is started later and begins to write new messages, the corresponding view is updated automatically, by default. @@ -238,7 +238,7 @@ Applications can customize a view's id by specifying an actor name during view c name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the ``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. -The ``viewId`` must differ from the referenced ``processorId``, unless :ref:`snapshots` of a view and its +The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its processor shall be shared (which is what applications usually do not want). .. _channels: @@ -378,7 +378,7 @@ creating the channel with the ``replyPersistent`` configuration parameter set to With this setting, either the successfully persisted message is replied to the sender or a ``PersistenceFailure`` message. In case the latter case, the sender should re-send the message. -.. _processor-identifiers: +.. _persistence-identifiers: Identifiers ----------- diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java index 36cfad6733..d193f64c27 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java @@ -25,23 +25,23 @@ interface AsyncRecoveryPlugin { * The channel ids of delivery confirmations that are available for a replayed * message must be contained in that message's `confirms` sequence. * - * @param processorId processor id. + * @param persistenceId processor id. * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param max maximum number of messages to be replayed. * @param replayCallback called to replay a single message. Can be called from any * thread. */ - Future doAsyncReplayMessages(String processorId, long fromSequenceNr, long toSequenceNr, long max, Procedure replayCallback); + Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, Procedure replayCallback); /** * Java API, Plugin API: asynchronously reads the highest stored sequence number - * for the given `processorId`. + * for the given `persistenceId`. * - * @param processorId processor id. + * @param persistenceId processor id. * @param fromSequenceNr hint where to start searching for the highest sequence * number. */ - Future doAsyncReadHighestSequenceNr(String processorId, long fromSequenceNr); + Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr); //#async-replay-plugin-api } diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java index 838e957de6..638656ccb1 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java @@ -28,7 +28,7 @@ interface AsyncWritePlugin { * from the journal. If `permanent` is set to `false`, the persistent messages are * marked as deleted, otherwise they are permanently deleted. */ - Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent); + Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent); /** * Java API, Plugin API: synchronously deletes all persistent messages up to @@ -37,6 +37,6 @@ interface AsyncWritePlugin { * * @see AsyncRecoveryPlugin */ - Future doAsyncDeleteMessagesTo(String processorId, long toSequenceNr, boolean permanent); + Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent); //#async-write-plugin-api } diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java index 39f52460ab..86600827df 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java @@ -26,7 +26,7 @@ interface SyncWritePlugin { * from the journal. If `permanent` is set to `false`, the persistent messages are * marked as deleted, otherwise they are permanently deleted. */ - void doDeleteMessages(Iterable messageIds, boolean permanent); + void doDeleteMessages(Iterable messageIds, boolean permanent); /** * Java API, Plugin API: synchronously deletes all persistent messages up to @@ -35,6 +35,6 @@ interface SyncWritePlugin { * * @see AsyncRecoveryPlugin */ - void doDeleteMessagesTo(String processorId, long toSequenceNr, boolean permanent); + void doDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent); //#sync-write-plugin-api } diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index 340c98e995..1d5169b9d6 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -721,20 +721,20 @@ public final class MessageFormats { */ long getSequenceNr(); - // optional string processorId = 3; + // optional string persistenceId = 3; /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - boolean hasProcessorId(); + boolean hasPersistenceId(); /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - java.lang.String getProcessorId(); + java.lang.String getPersistenceId(); /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ com.google.protobuf.ByteString - getProcessorIdBytes(); + getPersistenceIdBytes(); // optional bool deleted = 4; /** @@ -901,7 +901,7 @@ public final class MessageFormats { } case 26: { bitField0_ |= 0x00000004; - processorId_ = input.readBytes(); + persistenceId_ = input.readBytes(); break; } case 32: { @@ -1031,20 +1031,20 @@ public final class MessageFormats { return sequenceNr_; } - // optional string processorId = 3; - public static final int PROCESSORID_FIELD_NUMBER = 3; - private java.lang.Object processorId_; + // optional string persistenceId = 3; + public static final int PersistenceId_FIELD_NUMBER = 3; + private java.lang.Object persistenceId_; /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public boolean hasProcessorId() { + public boolean hasPersistenceId() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public java.lang.String getProcessorId() { - java.lang.Object ref = processorId_; + public java.lang.String getPersistenceId() { + java.lang.Object ref = persistenceId_; if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { @@ -1052,22 +1052,22 @@ public final class MessageFormats { (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { - processorId_ = s; + persistenceId_ = s; } return s; } } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ public com.google.protobuf.ByteString - getProcessorIdBytes() { - java.lang.Object ref = processorId_; + getPersistenceIdBytes() { + java.lang.Object ref = persistenceId_; if (ref instanceof java.lang.String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - processorId_ = b; + persistenceId_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; @@ -1263,7 +1263,7 @@ public final class MessageFormats { private void initFields() { payload_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); sequenceNr_ = 0L; - processorId_ = ""; + persistenceId_ = ""; deleted_ = false; redeliveries_ = 0; confirms_ = com.google.protobuf.LazyStringArrayList.EMPTY; @@ -1297,7 +1297,7 @@ public final class MessageFormats { output.writeInt64(2, sequenceNr_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeBytes(3, getProcessorIdBytes()); + output.writeBytes(3, getPersistenceIdBytes()); } if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBool(4, deleted_); @@ -1339,7 +1339,7 @@ public final class MessageFormats { } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(3, getProcessorIdBytes()); + .computeBytesSize(3, getPersistenceIdBytes()); } if (((bitField0_ & 0x00000008) == 0x00000008)) { size += com.google.protobuf.CodedOutputStream @@ -1500,7 +1500,7 @@ public final class MessageFormats { bitField0_ = (bitField0_ & ~0x00000001); sequenceNr_ = 0L; bitField0_ = (bitField0_ & ~0x00000002); - processorId_ = ""; + persistenceId_ = ""; bitField0_ = (bitField0_ & ~0x00000004); deleted_ = false; bitField0_ = (bitField0_ & ~0x00000008); @@ -1563,7 +1563,7 @@ public final class MessageFormats { if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - result.processorId_ = processorId_; + result.persistenceId_ = persistenceId_; if (((from_bitField0_ & 0x00000008) == 0x00000008)) { to_bitField0_ |= 0x00000008; } @@ -1620,9 +1620,9 @@ public final class MessageFormats { if (other.hasSequenceNr()) { setSequenceNr(other.getSequenceNr()); } - if (other.hasProcessorId()) { + if (other.hasPersistenceId()) { bitField0_ |= 0x00000004; - processorId_ = other.processorId_; + persistenceId_ = other.persistenceId_; onChanged(); } if (other.hasDeleted()) { @@ -1840,76 +1840,76 @@ public final class MessageFormats { return this; } - // optional string processorId = 3; - private java.lang.Object processorId_ = ""; + // optional string persistenceId = 3; + private java.lang.Object persistenceId_ = ""; /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public boolean hasProcessorId() { + public boolean hasPersistenceId() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public java.lang.String getProcessorId() { - java.lang.Object ref = processorId_; + public java.lang.String getPersistenceId() { + java.lang.Object ref = persistenceId_; if (!(ref instanceof java.lang.String)) { java.lang.String s = ((com.google.protobuf.ByteString) ref) .toStringUtf8(); - processorId_ = s; + persistenceId_ = s; return s; } else { return (java.lang.String) ref; } } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ public com.google.protobuf.ByteString - getProcessorIdBytes() { - java.lang.Object ref = processorId_; + getPersistenceIdBytes() { + java.lang.Object ref = persistenceId_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - processorId_ = b; + persistenceId_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public Builder setProcessorId( + public Builder setPersistenceId( java.lang.String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; - processorId_ = value; + persistenceId_ = value; onChanged(); return this; } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public Builder clearProcessorId() { + public Builder clearPersistenceId() { bitField0_ = (bitField0_ & ~0x00000004); - processorId_ = getDefaultInstance().getProcessorId(); + persistenceId_ = getDefaultInstance().getPersistenceId(); onChanged(); return this; } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public Builder setProcessorIdBytes( + public Builder setPersistenceIdBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; - processorId_ = value; + persistenceId_ = value; onChanged(); return this; } @@ -2965,20 +2965,20 @@ public final class MessageFormats { public interface DeliveredMessageOrBuilder extends com.google.protobuf.MessageOrBuilder { - // optional string processorId = 1; + // optional string persistenceId = 1; /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - boolean hasProcessorId(); + boolean hasPersistenceId(); /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - java.lang.String getProcessorId(); + java.lang.String getPersistenceId(); /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ com.google.protobuf.ByteString - getProcessorIdBytes(); + getPersistenceIdBytes(); // optional string channelId = 2; /** @@ -3083,7 +3083,7 @@ public final class MessageFormats { } case 10: { bitField0_ |= 0x00000001; - processorId_ = input.readBytes(); + persistenceId_ = input.readBytes(); break; } case 18: { @@ -3146,20 +3146,20 @@ public final class MessageFormats { } private int bitField0_; - // optional string processorId = 1; - public static final int PROCESSORID_FIELD_NUMBER = 1; - private java.lang.Object processorId_; + // optional string persistenceId = 1; + public static final int PersistenceId_FIELD_NUMBER = 1; + private java.lang.Object persistenceId_; /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public boolean hasProcessorId() { + public boolean hasPersistenceId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public java.lang.String getProcessorId() { - java.lang.Object ref = processorId_; + public java.lang.String getPersistenceId() { + java.lang.Object ref = persistenceId_; if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { @@ -3167,22 +3167,22 @@ public final class MessageFormats { (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { - processorId_ = s; + persistenceId_ = s; } return s; } } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ public com.google.protobuf.ByteString - getProcessorIdBytes() { - java.lang.Object ref = processorId_; + getPersistenceIdBytes() { + java.lang.Object ref = persistenceId_; if (ref instanceof java.lang.String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - processorId_ = b; + persistenceId_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; @@ -3308,7 +3308,7 @@ public final class MessageFormats { } private void initFields() { - processorId_ = ""; + persistenceId_ = ""; channelId_ = ""; persistentSequenceNr_ = 0L; deliverySequenceNr_ = 0L; @@ -3327,7 +3327,7 @@ public final class MessageFormats { throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeBytes(1, getProcessorIdBytes()); + output.writeBytes(1, getPersistenceIdBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getChannelIdBytes()); @@ -3352,7 +3352,7 @@ public final class MessageFormats { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(1, getProcessorIdBytes()); + .computeBytesSize(1, getPersistenceIdBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream @@ -3486,7 +3486,7 @@ public final class MessageFormats { public Builder clear() { super.clear(); - processorId_ = ""; + persistenceId_ = ""; bitField0_ = (bitField0_ & ~0x00000001); channelId_ = ""; bitField0_ = (bitField0_ & ~0x00000002); @@ -3527,7 +3527,7 @@ public final class MessageFormats { if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } - result.processorId_ = processorId_; + result.persistenceId_ = persistenceId_; if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } @@ -3560,9 +3560,9 @@ public final class MessageFormats { public Builder mergeFrom(akka.persistence.serialization.MessageFormats.DeliveredMessage other) { if (other == akka.persistence.serialization.MessageFormats.DeliveredMessage.getDefaultInstance()) return this; - if (other.hasProcessorId()) { + if (other.hasPersistenceId()) { bitField0_ |= 0x00000001; - processorId_ = other.processorId_; + persistenceId_ = other.persistenceId_; onChanged(); } if (other.hasChannelId()) { @@ -3608,76 +3608,76 @@ public final class MessageFormats { } private int bitField0_; - // optional string processorId = 1; - private java.lang.Object processorId_ = ""; + // optional string persistenceId = 1; + private java.lang.Object persistenceId_ = ""; /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public boolean hasProcessorId() { + public boolean hasPersistenceId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public java.lang.String getProcessorId() { - java.lang.Object ref = processorId_; + public java.lang.String getPersistenceId() { + java.lang.Object ref = persistenceId_; if (!(ref instanceof java.lang.String)) { java.lang.String s = ((com.google.protobuf.ByteString) ref) .toStringUtf8(); - processorId_ = s; + persistenceId_ = s; return s; } else { return (java.lang.String) ref; } } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ public com.google.protobuf.ByteString - getProcessorIdBytes() { - java.lang.Object ref = processorId_; + getPersistenceIdBytes() { + java.lang.Object ref = persistenceId_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - processorId_ = b; + persistenceId_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public Builder setProcessorId( + public Builder setPersistenceId( java.lang.String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000001; - processorId_ = value; + persistenceId_ = value; onChanged(); return this; } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public Builder clearProcessorId() { + public Builder clearPersistenceId() { bitField0_ = (bitField0_ & ~0x00000001); - processorId_ = getDefaultInstance().getProcessorId(); + persistenceId_ = getDefaultInstance().getPersistenceId(); onChanged(); return this; } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public Builder setProcessorIdBytes( + public Builder setPersistenceIdBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000001; - processorId_ = value; + persistenceId_ = value; onChanged(); return this; } @@ -4620,14 +4620,14 @@ public final class MessageFormats { "ageBatch\022!\n\005batch\030\001 \003(\0132\022.PersistentMess" + "age\"\201\002\n\021PersistentMessage\022#\n\007payload\030\001 \001" + "(\0132\022.PersistentPayload\022\022\n\nsequenceNr\030\002 \001" + - "(\003\022\023\n\013processorId\030\003 \001(\t\022\017\n\007deleted\030\004 \001(\010" + + "(\003\022\023\n\013persistenceId\030\003 \001(\t\022\017\n\007deleted\030\004 \001(\010" + "\022\024\n\014redeliveries\030\006 \001(\005\022\020\n\010confirms\030\007 \003(\t" + "\022\023\n\013confirmable\030\010 \001(\010\022)\n\016confirmMessage\030" + "\t \001(\0132\021.DeliveredMessage\022\025\n\rconfirmTarge" + "t\030\n \001(\t\022\016\n\006sender\030\013 \001(\t\"S\n\021PersistentPay" + "load\022\024\n\014serializerId\030\001 \002(\005\022\017\n\007payload\030\002 ", "\002(\014\022\027\n\017payloadManifest\030\003 \001(\014\"\205\001\n\020Deliver" + - "edMessage\022\023\n\013processorId\030\001 \001(\t\022\021\n\tchanne" + + "edMessage\022\023\n\013persistenceId\030\001 \001(\t\022\021\n\tchanne" + "lId\030\002 \001(\t\022\034\n\024persistentSequenceNr\030\003 \001(\003\022" + "\032\n\022deliverySequenceNr\030\004 \001(\003\022\017\n\007channel\030\005" + " \001(\t\"M\n\016DeliverMessage\022&\n\npersistent\030\001 \001" + @@ -4650,7 +4650,7 @@ public final class MessageFormats { internal_static_PersistentMessage_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PersistentMessage_descriptor, - new java.lang.String[] { "Payload", "SequenceNr", "ProcessorId", "Deleted", "Redeliveries", "Confirms", "Confirmable", "ConfirmMessage", "ConfirmTarget", "Sender", }); + new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Redeliveries", "Confirms", "Confirmable", "ConfirmMessage", "ConfirmTarget", "Sender", }); internal_static_PersistentPayload_descriptor = getDescriptor().getMessageTypes().get(2); internal_static_PersistentPayload_fieldAccessorTable = new @@ -4662,7 +4662,7 @@ public final class MessageFormats { internal_static_DeliveredMessage_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_DeliveredMessage_descriptor, - new java.lang.String[] { "ProcessorId", "ChannelId", "PersistentSequenceNr", "DeliverySequenceNr", "Channel", }); + new java.lang.String[] { "PersistenceId", "ChannelId", "PersistentSequenceNr", "DeliverySequenceNr", "Channel", }); internal_static_DeliverMessage_descriptor = getDescriptor().getMessageTypes().get(4); internal_static_DeliverMessage_fieldAccessorTable = new diff --git a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java index c96292e53f..3b506ce87f 100644 --- a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java @@ -14,10 +14,10 @@ interface SnapshotStorePlugin { /** * Java API, Plugin API: asynchronously loads a snapshot. * - * @param processorId processor id. + * @param persistenceId processor id. * @param criteria selection criteria for loading. */ - Future> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria); + Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria); /** * Java API, Plugin API: asynchronously saves a snapshot. @@ -44,9 +44,9 @@ interface SnapshotStorePlugin { /** * Java API, Plugin API: deletes all snapshots matching `criteria`. * - * @param processorId processor id. + * @param persistenceId processor id. * @param criteria selection criteria for deleting. */ - void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception; + void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception; //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index c96747ddcc..3cc5db7276 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -12,7 +12,7 @@ message PersistentMessageBatch { message PersistentMessage { optional PersistentPayload payload = 1; optional int64 sequenceNr = 2; - optional string processorId = 3; + optional string persistenceId = 3; optional bool deleted = 4; optional int32 redeliveries = 6; repeated string confirms = 7; @@ -29,7 +29,7 @@ message PersistentPayload { } message DeliveredMessage { - optional string processorId = 1; + optional string persistenceId = 1; optional string channelId = 2; optional int64 persistentSequenceNr = 3; optional int64 deliverySequenceNr = 4; diff --git a/akka-persistence/src/main/scala/akka/persistence/Channel.scala b/akka-persistence/src/main/scala/akka/persistence/Channel.scala index 95c6c06cbe..6c85941e85 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Channel.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Channel.scala @@ -139,7 +139,7 @@ final class Channel private[akka] (_channelId: Option[String], channelSettings: private def prepareDelivery(persistent: PersistentRepr): PersistentRepr = ConfirmablePersistentImpl(persistent, confirmTarget = journal, - confirmMessage = DeliveredByChannel(persistent.processorId, id, persistent.sequenceNr, channel = self)) + confirmMessage = DeliveredByChannel(persistent.persistenceId, id, persistent.sequenceNr, channel = self)) } object Channel { @@ -216,7 +216,7 @@ trait Delivered extends Message { * Plugin API. */ final case class DeliveredByChannel( - processorId: String, + @deprecatedName('processorId) persistenceId: String, channelId: String, persistentSequenceNr: Long, deliverySequenceNr: Long = 0L, diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index af60f07348..9753d957ae 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -18,12 +18,12 @@ private[persistence] object JournalProtocol { * Request to delete messages identified by `messageIds`. If `permanent` is set to `false`, * the persistent messages are marked as deleted, otherwise they are permanently deleted. */ - final case class DeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean, requestor: Option[ActorRef] = None) + final case class DeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean, requestor: Option[ActorRef] = None) /** * Reply message to a successful [[DeleteMessages]] request. */ - final case class DeleteMessagesSuccess(messageIds: immutable.Seq[PersistentId]) + final case class DeleteMessagesSuccess(messageIds: immutable.Seq[PersistenceId]) /** * Reply message to a failed [[DeleteMessages]] request. @@ -35,7 +35,7 @@ private[persistence] object JournalProtocol { * (inclusive). If `permanent` is set to `false`, the persistent messages are marked * as deleted in the journal, otherwise they are permanently deleted from the journal. */ - final case class DeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean) + final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) /** * Request to write delivery confirmations. @@ -113,11 +113,11 @@ private[persistence] object JournalProtocol { * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param max maximum number of messages to be replayed. - * @param processorId requesting processor id. + * @param persistenceId requesting processor id. * @param processor requesting processor. * @param replayDeleted `true` if messages marked as deleted shall be replayed. */ - final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, processorId: String, processor: ActorRef, replayDeleted: Boolean = false) + final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, persistenceId: String, processor: ActorRef, replayDeleted: Boolean = false) /** * Reply message to a [[ReplayMessages]] request. A separate reply is sent to the requestor for each @@ -143,10 +143,10 @@ private[persistence] object JournalProtocol { * Request to read the highest stored sequence number of a given processor. * * @param fromSequenceNr optional hint where to start searching for the maximum sequence number. - * @param processorId requesting processor id. + * @param persistenceId requesting processor id. * @param processor requesting processor. */ - final case class ReadHighestSequenceNr(fromSequenceNr: Long = 1L, processorId: String, processor: ActorRef) + final case class ReadHighestSequenceNr(fromSequenceNr: Long = 1L, persistenceId: String, processor: ActorRef) /** * Reply message to a successful [[ReadHighestSequenceNr]] request. diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index db9744e473..048e642d86 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -102,26 +102,33 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { /** * Creates a canonical processor id from a processor actor ref. */ + @deprecated("Use `persistenceId` instead. Processor will be removed.", since = "2.3.4") def processorId(processor: ActorRef): String = id(processor) + /** + * Creates a canonical persistent actor id from a processor actor ref. + */ + def persistenceId(persistentActor: ActorRef): String = id(persistentActor) + /** * Creates a canonical channel id from a channel actor ref. */ + @deprecated("Channels will be removed. You may want to use `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") def channelId(channel: ActorRef): String = id(channel) /** - * Returns a snapshot store for a processor identified by `processorId`. + * Returns a snapshot store for a processor identified by `persistenceId`. */ - def snapshotStoreFor(processorId: String): ActorRef = { + def snapshotStoreFor(persistenceId: String): ActorRef = { // Currently returns a snapshot store singleton but this methods allows for later // optimizations where each processor can have its own snapshot store actor. snapshotStore } /** - * Returns a journal for a processor identified by `processorId`. + * Returns a journal for a processor identified by `persistenceId`. */ - def journalFor(processorId: String): ActorRef = { + def journalFor(persistenceId: String): ActorRef = { // Currently returns a journal singleton but this methods allows for later // optimizations where each processor can have its own journal actor. journal diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 2c957c4156..bf08fbb308 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -128,19 +128,35 @@ case class PersistentBatch(batch: immutable.Seq[Resequenceable]) extends Message * Plugin API: confirmation entry written by journal plugins. */ trait PersistentConfirmation { - def processorId: String + @deprecated("Use `persistenceId` instead. Processor will be removed.", since = "2.3.4") + final def processorId: String = persistenceId + def persistenceId: String def channelId: String def sequenceNr: Long } /** * Plugin API: persistent message identifier. + * + * Deprecated, please use [[PersistenceId]]. */ -trait PersistentId { +@deprecated("Use PersistenceId instead.", since = "2.3.4") +trait PersistentId extends PersistenceId { /** - * Id of processor that journals a persistent message + * Persistent id that journals a persistent message */ - def processorId: String + @deprecated("Use `persistenceId` instead.", since = "2.3.4") + def processorId: String = persistenceId +} + +/** + * Plugin API: persistent message identifier. + */ +trait PersistenceId { + /** + * Persistent id that journals a persistent message + */ + def persistenceId: String /** * A persistent message's sequence number. @@ -151,7 +167,7 @@ trait PersistentId { /** * INTERNAL API. */ -private[persistence] final case class PersistentIdImpl(processorId: String, sequenceNr: Long) extends PersistentId +private[persistence] final case class PersistenceIdImpl(persistenceId: String, sequenceNr: Long) extends PersistenceId /** * Plugin API: representation of a persistent message in the journal plugin API. @@ -160,7 +176,7 @@ private[persistence] final case class PersistentIdImpl(processorId: String, sequ * @see [[journal.AsyncWriteJournal]] * @see [[journal.AsyncRecovery]] */ -trait PersistentRepr extends Persistent with Resequenceable with PersistentId with Message { +trait PersistentRepr extends Persistent with Resequenceable with PersistenceId with Message { // todo we want to get rid of the Persistent() wrapper from user land; PersistentRepr is here to stay. #15230 import scala.collection.JavaConverters._ @@ -229,7 +245,7 @@ trait PersistentRepr extends Persistent with Resequenceable with PersistentId wi */ def update( sequenceNr: Long = sequenceNr, - processorId: String = processorId, + @deprecatedName('processorId) persistenceId: String = persistenceId, deleted: Boolean = deleted, redeliveries: Int = redeliveries, confirms: immutable.Seq[String] = confirms, @@ -250,7 +266,7 @@ object PersistentRepr { def apply( payload: Any, sequenceNr: Long = 0L, - processorId: String = PersistentRepr.Undefined, + @deprecatedName('processorId) persistenceId: String = PersistentRepr.Undefined, deleted: Boolean = false, redeliveries: Int = 0, confirms: immutable.Seq[String] = Nil, @@ -258,8 +274,8 @@ object PersistentRepr { confirmMessage: Delivered = null, confirmTarget: ActorRef = null, sender: ActorRef = null) = - if (confirmable) ConfirmablePersistentImpl(payload, sequenceNr, processorId, deleted, redeliveries, confirms, confirmMessage, confirmTarget, sender) - else PersistentImpl(payload, sequenceNr, processorId, deleted, confirms, sender) + if (confirmable) ConfirmablePersistentImpl(payload, sequenceNr, persistenceId, deleted, redeliveries, confirms, confirmMessage, confirmTarget, sender) + else PersistentImpl(payload, sequenceNr, persistenceId, deleted, confirms, sender) /** * Java API, Plugin API. @@ -281,7 +297,7 @@ object PersistentBatch { private[persistence] final case class PersistentImpl( payload: Any, sequenceNr: Long, - processorId: String, + @deprecatedName('processorId) persistenceId: String, deleted: Boolean, confirms: immutable.Seq[String], sender: ActorRef) extends Persistent with PersistentRepr { @@ -294,14 +310,14 @@ private[persistence] final case class PersistentImpl( def update( sequenceNr: Long, - processorId: String, + @deprecatedName('processorId) persistenceId: String, deleted: Boolean, redeliveries: Int, confirms: immutable.Seq[String], confirmMessage: Delivered, confirmTarget: ActorRef, sender: ActorRef) = - copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, confirms = confirms, sender = sender) + copy(sequenceNr = sequenceNr, persistenceId = persistenceId, deleted = deleted, confirms = confirms, sender = sender) val redeliveries: Int = 0 val confirmable: Boolean = false @@ -315,7 +331,7 @@ private[persistence] final case class PersistentImpl( private[persistence] final case class ConfirmablePersistentImpl( payload: Any, sequenceNr: Long, - processorId: String, + @deprecatedName('processorId) persistenceId: String, deleted: Boolean, redeliveries: Int, confirms: immutable.Seq[String], @@ -334,8 +350,8 @@ private[persistence] final case class ConfirmablePersistentImpl( def prepareWrite(sender: ActorRef) = copy(sender = sender, confirmMessage = null, confirmTarget = null) - def update(sequenceNr: Long, processorId: String, deleted: Boolean, redeliveries: Int, confirms: immutable.Seq[String], confirmMessage: Delivered, confirmTarget: ActorRef, sender: ActorRef) = - copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, redeliveries = redeliveries, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender) + def update(sequenceNr: Long, @deprecatedName('processorId) persistenceId: String, deleted: Boolean, redeliveries: Int, confirms: immutable.Seq[String], confirmMessage: Delivered, confirmTarget: ActorRef, sender: ActorRef) = + copy(sequenceNr = sequenceNr, persistenceId = persistenceId, deleted = deleted, redeliveries = redeliveries, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender) } /** @@ -343,5 +359,5 @@ private[persistence] final case class ConfirmablePersistentImpl( */ private[persistence] object ConfirmablePersistentImpl { def apply(persistent: PersistentRepr, confirmMessage: Delivered, confirmTarget: ActorRef = null): ConfirmablePersistentImpl = - ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.processorId, persistent.deleted, persistent.redeliveries, persistent.confirms, confirmMessage, confirmTarget, persistent.sender) + ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.persistenceId, persistent.deleted, persistent.redeliveries, persistent.confirms, confirmMessage, confirmTarget, persistent.sender) } diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala index 06c3be1c9f..5132253f58 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala @@ -183,15 +183,16 @@ object PersistentChannel { /** * Plugin API. */ -final case class DeliveredByPersistentChannel( +@deprecated("PersistentChannel will be removed, see `AtLeastOnceDelivery` instead.", since = "2.3.4") +final case class DeliveredByPersistenceChannel( channelId: String, persistentSequenceNr: Long, deliverySequenceNr: Long = 0L, - channel: ActorRef = null) extends Delivered with PersistentId { + channel: ActorRef = null) extends Delivered with PersistenceId { - def processorId: String = channelId + def persistenceId: String = channelId def sequenceNr: Long = persistentSequenceNr - def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByPersistentChannel = + def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByPersistenceChannel = copy(deliverySequenceNr = deliverySequenceNr, channel = channel) } @@ -203,25 +204,25 @@ private[persistence] class DeliveredByPersistentChannelBatching(journal: ActorRe private val batchMax = settings.journal.maxConfirmationBatchSize private var batching = false - private var batch = Vector.empty[DeliveredByPersistentChannel] + private var batch = Vector.empty[DeliveredByPersistenceChannel] def receive = { case DeleteMessagesSuccess(messageIds) ⇒ if (batch.isEmpty) batching = false else journalBatch() messageIds.foreach { - case c: DeliveredByPersistentChannel ⇒ + case c: DeliveredByPersistenceChannel ⇒ c.channel ! c if (publish) context.system.eventStream.publish(c) } case DeleteMessagesFailure(_) ⇒ if (batch.isEmpty) batching = false else journalBatch() - case d: DeliveredByPersistentChannel ⇒ + case d: DeliveredByPersistenceChannel ⇒ addToBatch(d) if (!batching || maxBatchSizeReached) journalBatch() case m ⇒ journal forward m } - def addToBatch(pc: DeliveredByPersistentChannel): Unit = + def addToBatch(pc: DeliveredByPersistenceChannel): Unit = batch = batch :+ pc def maxBatchSizeReached: Boolean = @@ -243,16 +244,16 @@ private class RequestWriter(channelId: String, channelSettings: PersistentChanne private val cbJournal = extension.confirmationBatchingJournalForChannel(channelId) - override val processorId = channelId + override val persistenceId = channelId def receive = { case p @ Persistent(Deliver(wrapped: PersistentRepr, _), _) ⇒ - if (!recoveryRunning && wrapped.processorId != PersistentRepr.Undefined) { + if (!recoveryRunning && wrapped.persistenceId != PersistentRepr.Undefined) { // Write a delivery confirmation to the journal so that replayed Deliver // requests from a sending processor are not persisted again. Replaying // Deliver requests is now the responsibility of this processor // and confirmation by destination is done to the wrapper p.sequenceNr. - cbJournal ! DeliveredByChannel(wrapped.processorId, channelId, wrapped.sequenceNr) + cbJournal ! DeliveredByChannel(wrapped.persistenceId, channelId, wrapped.sequenceNr) } if (!recoveryRunning && replyPersistent) @@ -341,8 +342,7 @@ private class RequestReader(channelId: String, channelSettings: PersistentChanne onReplayComplete() } - def processorId: String = - channelId + override def persistenceId: String = channelId def snapshotterId: String = s"${channelId}-reader" @@ -368,7 +368,7 @@ private class RequestReader(channelId: String, channelSettings: PersistentChanne private def onReadRequest(): Unit = if (_currentState == idle) { _currentState = replayStarted(await = false) - dbJournal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, pendingConfirmationsMax - numPending, processorId, self) + dbJournal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, pendingConfirmationsMax - numPending, persistenceId, self) } /** @@ -378,7 +378,7 @@ private class RequestReader(channelId: String, channelSettings: PersistentChanne private def prepareDelivery(wrapped: PersistentRepr, wrapper: PersistentRepr): PersistentRepr = { ConfirmablePersistentImpl(wrapped, confirmTarget = dbJournal, - confirmMessage = DeliveredByPersistentChannel(channelId, wrapper.sequenceNr, channel = self)) + confirmMessage = DeliveredByPersistenceChannel(channelId, wrapper.sequenceNr, channel = self)) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index dd1e3f980b..7bc180e79b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -111,7 +111,7 @@ trait Processor extends Actor with Recovery { def addToBatch(p: Resequenceable): Unit = p match { case p: PersistentRepr ⇒ - processorBatch = processorBatch :+ p.update(processorId = processorId, sequenceNr = nextSequenceNr(), sender = sender()) + processorBatch = processorBatch :+ p.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), sender = sender()) case r ⇒ processorBatch = processorBatch :+ r } @@ -136,7 +136,7 @@ trait Processor extends Actor with Recovery { */ private[persistence] def onReplaySuccess(receive: Receive, awaitReplay: Boolean): Unit = { _currentState = initializing - journal ! ReadHighestSequenceNr(lastSequenceNr, processorId, self) + journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self) } /** @@ -157,7 +157,7 @@ trait Processor extends Actor with Recovery { private def onRecoveryCompleted(receive: Receive): Unit = receive.applyOrElse(RecoveryCompleted, unhandled) - private val _processorId = extension.processorId(self) + private val _persistenceId = extension.persistenceId(self) private var processorBatch = Vector.empty[Resequenceable] private var sequenceNr: Long = 0L @@ -165,12 +165,18 @@ trait Processor extends Actor with Recovery { /** * Processor id. Defaults to this processor's path and can be overridden. */ - def processorId: String = _processorId + @deprecated("Override `persistenceId: String` instead. Processor will be removed.", since = "2.3.4") + override def processorId: String = _persistenceId // TODO: remove processorId /** - * Returns `processorId`. + * Persistence id. Defaults to this persistent-actors's path and can be overridden. */ - def snapshotterId: String = processorId + override def persistenceId: String = processorId + + /** + * Returns `persistenceId`. + */ + def snapshotterId: String = persistenceId /** * Returns `true` if this processor is currently recovering. @@ -193,7 +199,7 @@ trait Processor extends Actor with Recovery { * @param sequenceNr sequence number of the persistent message to be deleted. */ def deleteMessage(sequenceNr: Long): Unit = { - deleteMessage(sequenceNr, false) + deleteMessage(sequenceNr, permanent = false) } /** @@ -208,7 +214,7 @@ trait Processor extends Actor with Recovery { * @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted. */ def deleteMessage(sequenceNr: Long, permanent: Boolean): Unit = { - journal ! DeleteMessages(List(PersistentIdImpl(processorId, sequenceNr)), permanent) + journal ! DeleteMessages(List(PersistenceIdImpl(persistenceId, sequenceNr)), permanent) } /** @@ -217,7 +223,7 @@ trait Processor extends Actor with Recovery { * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. */ def deleteMessages(toSequenceNr: Long): Unit = { - deleteMessages(toSequenceNr, true) + deleteMessages(toSequenceNr, permanent = true) } /** @@ -229,7 +235,7 @@ trait Processor extends Actor with Recovery { * @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted. */ def deleteMessages(toSequenceNr: Long, permanent: Boolean): Unit = { - journal ! DeleteMessagesTo(processorId, toSequenceNr, permanent) + journal ! DeleteMessagesTo(persistenceId, toSequenceNr, permanent) } /** @@ -296,13 +302,13 @@ trait Processor extends Actor with Recovery { message match { case RecoveryCompleted ⇒ // mute case RecoveryFailure(cause) ⇒ - val errorMsg = s"Processor killed after recovery failure (processor id = [${processorId}]). " + + val errorMsg = s"Processor killed after recovery failure (persisten id = [${persistenceId}]). " + "To avoid killing processors on recovery failure, a processor must handle RecoveryFailure messages. " + "RecoveryFailure was caused by: " + cause throw new ActorKilledException(errorMsg) case PersistenceFailure(payload, sequenceNumber, cause) ⇒ val errorMsg = "Processor killed after persistence failure " + - s"(processor id = [${processorId}], sequence nr = [${sequenceNumber}], payload class = [${payload.getClass.getName}]). " + + s"(persistent id = [${persistenceId}], sequence nr = [${sequenceNumber}], payload class = [${payload.getClass.getName}]). " + "To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages. " + "PersistenceFailure was caused by: " + cause throw new ActorKilledException(errorMsg) diff --git a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala index 138bd46ada..68152cbbdf 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala @@ -87,7 +87,7 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { process(receive, SnapshotOffer(metadata, snapshot)) } _currentState = replayStarted(await = true) - journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, processorId, self) + journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) case other ⇒ receiverStash.stash() } } @@ -171,7 +171,10 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { /** * Id of the processor for which messages should be replayed. */ - def processorId: String + @deprecated("Override `persistenceId` instead. Processor will be removed.", since = "2.3.4") + def processorId: String = extension.persistenceId(self) // TODO: remove processorId + + def persistenceId: String = processorId /** * Returns the current persistent message if there is any. @@ -229,7 +232,7 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { /** * INTERNAL API. */ - private[persistence] lazy val journal = extension.journalFor(processorId) + private[persistence] lazy val journal = extension.journalFor(persistenceId) /** * INTERNAL API. diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala index 851698dce5..0b5ac1a547 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala @@ -8,12 +8,12 @@ package akka.persistence /** * Snapshot metadata. * - * @param processorId id of processor from which the snapshot was taken. + * @param persistenceId id of processor from which the snapshot was taken. * @param sequenceNr sequence number at which the snapshot was taken. * @param timestamp time at which the snapshot was saved. */ @SerialVersionUID(1L) //#snapshot-metadata -final case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Long = 0L) +final case class SnapshotMetadata(@deprecatedName('processorId) persistenceId: String, sequenceNr: Long, timestamp: Long = 0L) //#snapshot-metadata /** @@ -116,11 +116,11 @@ private[persistence] object SnapshotProtocol { /** * Instructs a snapshot store to load a snapshot. * - * @param processorId processor id. + * @param persistenceId processor id. * @param criteria criteria for selecting a snapshot from which recovery should start. * @param toSequenceNr upper sequence number bound (inclusive) for recovery. */ - final case class LoadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) + final case class LoadSnapshot(@deprecatedName('processorId) persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) /** * Response message to a [[LoadSnapshot]] message. @@ -147,8 +147,8 @@ private[persistence] object SnapshotProtocol { /** * Instructs snapshot store to delete all snapshots that match `criteria`. * - * @param processorId processor id. + * @param persistenceId processor id. * @param criteria criteria for selecting snapshots to be deleted. */ - final case class DeleteSnapshots(processorId: String, criteria: SnapshotSelectionCriteria) + final case class DeleteSnapshots(@deprecatedName('processorId) persistenceId: String, criteria: SnapshotSelectionCriteria) } diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala index 1118e64822..f03408322b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala @@ -23,8 +23,8 @@ trait Snapshotter extends Actor { */ def snapshotSequenceNr: Long - def loadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) = - snapshotStore ! LoadSnapshot(processorId, criteria, toSequenceNr) + def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) = + snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr) /** * Saves a `snapshot` of this snapshotter's state. If saving succeeds, this snapshotter will receive a diff --git a/akka-persistence/src/main/scala/akka/persistence/View.scala b/akka-persistence/src/main/scala/akka/persistence/View.scala index 2a1db64bfa..26ed3a9208 100644 --- a/akka-persistence/src/main/scala/akka/persistence/View.scala +++ b/akka-persistence/src/main/scala/akka/persistence/View.scala @@ -50,7 +50,7 @@ case object Update { * message stream as [[Persistent]] messages. These messages can be processed to update internal state * in order to maintain an (eventual consistent) view of the state of the corresponding processor. A * view can also run on a different node, provided that a replicated journal is used. Implementation - * classes reference a processor by implementing `processorId`. + * classes reference a processor by implementing `persistenceId`. * * Views can also store snapshots of internal state by calling [[#saveSnapshot]]. The snapshots of a view * are independent of those of the referenced processor. During recovery, a saved snapshot is offered @@ -106,7 +106,7 @@ trait View extends Actor with Recovery { case r: Recover ⇒ // ignore case Update(awaitUpdate, replayMax) ⇒ _currentState = replayStarted(await = awaitUpdate) - journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, processorId, self) + journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self) case other ⇒ process(receive, other) } } @@ -132,7 +132,7 @@ trait View extends Actor with Recovery { if (await) receiverStash.unstashAll() } - private val _viewId = extension.processorId(self) + private val _viewId = extension.persistenceId(self) private val viewSettings = extension.settings.view private var schedule: Option[Cancellable] = None diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala index c9c9a45f49..9724894df2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala @@ -27,7 +27,7 @@ trait AsyncRecovery { * The channel ids of delivery confirmations that are available for a replayed * message must be contained in that message's `confirms` sequence. * - * @param processorId processor id. + * @param persistenceId processor id. * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param max maximum number of messages to be replayed. @@ -37,16 +37,16 @@ trait AsyncRecovery { * @see [[AsyncWriteJournal]] * @see [[SyncWriteJournal]] */ - def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] /** * Plugin API: asynchronously reads the highest stored sequence number for the - * given `processorId`. + * given `persistenceId`. * - * @param processorId processor id. + * @param persistenceId processor id. * @param fromSequenceNr hint where to start searching for the highest sequence * number. */ - def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] + def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] //#journal-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 7676794a8b..a90d95d846 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -43,10 +43,10 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { resequence(WriteMessageFailure(_, e)) } resequencerCounter += resequenceables.length + 1 - case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, processor, replayDeleted) ⇒ // Send replayed messages and replay result to processor directly. No need // to resequence replayed messages relative to written and looped messages. - asyncReplayMessages(processorId, fromSequenceNr, toSequenceNr, max) { p ⇒ + asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒ if (!p.deleted || replayDeleted) processor.tell(ReplayedMessage(p), p.sender) } map { case _ ⇒ ReplayMessagesSuccess @@ -55,10 +55,10 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } pipeTo (processor) onSuccess { case _ if publish ⇒ context.system.eventStream.publish(r) } - case ReadHighestSequenceNr(fromSequenceNr, processorId, processor) ⇒ + case ReadHighestSequenceNr(fromSequenceNr, persistenceId, processor) ⇒ // Send read highest sequence number to processor directly. No need // to resequence the result relative to written and looped messages. - asyncReadHighestSequenceNr(processorId, fromSequenceNr).map { + asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map { highest ⇒ ReadHighestSequenceNrSuccess(highest) } recover { case e ⇒ ReadHighestSequenceNrFailure(e) @@ -75,8 +75,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { if (publish) context.system.eventStream.publish(d) case Failure(e) ⇒ } - case d @ DeleteMessagesTo(processorId, toSequenceNr, permanent) ⇒ - asyncDeleteMessagesTo(processorId, toSequenceNr, permanent) onComplete { + case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒ + asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) case Failure(e) ⇒ } @@ -103,14 +103,14 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * journal. If `permanent` is set to `false`, the persistent messages are marked as * deleted, otherwise they are permanently deleted. */ - def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit] + def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Future[Unit] /** * Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr` * (inclusive). If `permanent` is set to `false`, the persistent messages are marked * as deleted, otherwise they are permanently deleted. */ - def asyncDeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] //#journal-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala index 0cb5b8582c..ba1b4ee0ba 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -43,21 +43,21 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash def asyncWriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]): Future[Unit] = (store ? WriteConfirmations(confirmations)).mapTo[Unit] - def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit] = + def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Future[Unit] = (store ? DeleteMessages(messageIds, permanent)).mapTo[Unit] - def asyncDeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = - (store ? DeleteMessagesTo(processorId, toSequenceNr, permanent)).mapTo[Unit] + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = + (store ? DeleteMessagesTo(persistenceId, toSequenceNr, permanent)).mapTo[Unit] - def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Unit] = { + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Unit] = { val replayCompletionPromise = Promise[Unit] val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local)) - store.tell(ReplayMessages(processorId, fromSequenceNr, toSequenceNr, max), mediator) + store.tell(ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max), mediator) replayCompletionPromise.future } - def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] = - (store ? ReadHighestSequenceNr(processorId, fromSequenceNr)).mapTo[Long] + def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = + (store ? ReadHighestSequenceNr(persistenceId, fromSequenceNr)).mapTo[Long] } /** @@ -78,13 +78,13 @@ private[persistence] object AsyncWriteTarget { final case class WriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) @SerialVersionUID(1L) - final case class DeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) + final case class DeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) @SerialVersionUID(1L) - final case class DeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean) + final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) @SerialVersionUID(1L) - final case class ReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) + final case class ReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) @SerialVersionUID(1L) case object ReplaySuccess @@ -93,7 +93,7 @@ private[persistence] object AsyncWriteTarget { final case class ReplayFailure(cause: Throwable) @SerialVersionUID(1L) - final case class ReadHighestSequenceNr(processorId: String, fromSequenceNr: Long) + final case class ReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long) } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index 0d43b9e393..e940ffae8a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -39,8 +39,8 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } throw e } - case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ - asyncReplayMessages(processorId, fromSequenceNr, toSequenceNr, max) { p ⇒ + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, processor, replayDeleted) ⇒ + asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒ if (!p.deleted || replayDeleted) processor.tell(ReplayedMessage(p), p.sender) } map { case _ ⇒ ReplayMessagesSuccess @@ -49,8 +49,8 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } pipeTo (processor) onSuccess { case _ if publish ⇒ context.system.eventStream.publish(r) } - case ReadHighestSequenceNr(fromSequenceNr, processorId, processor) ⇒ - asyncReadHighestSequenceNr(processorId, fromSequenceNr).map { + case ReadHighestSequenceNr(fromSequenceNr, persistenceId, processor) ⇒ + asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map { highest ⇒ ReadHighestSequenceNrSuccess(highest) } recover { case e ⇒ ReadHighestSequenceNrFailure(e) @@ -68,8 +68,8 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { case Failure(e) ⇒ requestorOption.foreach(_ ! DeleteMessagesFailure(e)) } - case d @ DeleteMessagesTo(processorId, toSequenceNr, permanent) ⇒ - Try(deleteMessagesTo(processorId, toSequenceNr, permanent)) match { + case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒ + Try(deleteMessagesTo(persistenceId, toSequenceNr, permanent)) match { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) case Failure(e) ⇒ } @@ -95,13 +95,13 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * journal. If `permanent` is set to `false`, the persistent messages are marked as * deleted, otherwise they are permanently deleted. */ - def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Unit + def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Unit /** * Plugin API: synchronously deletes all persistent messages up to `toSequenceNr` * (inclusive). If `permanent` is set to `false`, the persistent messages are marked * as deleted, otherwise they are permanently deleted. */ - def deleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Unit + def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit //#journal-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 327992b22d..51c1e2bd31 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -37,9 +37,9 @@ private[persistence] trait InmemMessages { // processor id -> persistent message var messages = Map.empty[String, Vector[PersistentRepr]] - def add(p: PersistentRepr) = messages = messages + (messages.get(p.processorId) match { - case Some(ms) ⇒ p.processorId -> (ms :+ p) - case None ⇒ p.processorId -> Vector(p) + def add(p: PersistentRepr) = messages = messages + (messages.get(p.persistenceId) match { + case Some(ms) ⇒ p.persistenceId -> (ms :+ p) + case None ⇒ p.persistenceId -> Vector(p) }) def update(pid: String, snr: Long)(f: PersistentRepr ⇒ PersistentRepr) = messages = messages.get(pid) match { @@ -79,11 +79,11 @@ private[persistence] class InmemStore extends Actor with InmemMessages { case WriteMessages(msgs) ⇒ sender() ! msgs.foreach(add) case WriteConfirmations(cnfs) ⇒ - sender() ! cnfs.foreach(cnf ⇒ update(cnf.processorId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms))) + sender() ! cnfs.foreach(cnf ⇒ update(cnf.persistenceId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms))) case DeleteMessages(msgIds, false) ⇒ - sender() ! msgIds.foreach(msgId ⇒ update(msgId.processorId, msgId.sequenceNr)(_.update(deleted = true))) + sender() ! msgIds.foreach(msgId ⇒ update(msgId.persistenceId, msgId.sequenceNr)(_.update(deleted = true))) case DeleteMessages(msgIds, true) ⇒ - sender() ! msgIds.foreach(msgId ⇒ delete(msgId.processorId, msgId.sequenceNr)) + sender() ! msgIds.foreach(msgId ⇒ delete(msgId.persistenceId, msgId.sequenceNr)) case DeleteMessagesTo(pid, tsnr, false) ⇒ sender() ! (1L to tsnr foreach { snr ⇒ update(pid, snr)(_.update(deleted = true)) }) case DeleteMessagesTo(pid, tsnr, true) ⇒ @@ -91,7 +91,7 @@ private[persistence] class InmemStore extends Actor with InmemMessages { case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ read(pid, fromSnr, toSnr, max).foreach(sender() ! _) sender() ! ReplaySuccess - case ReadHighestSequenceNr(processorId, _) ⇒ - sender() ! highestSequenceNr(processorId) + case ReadHighestSequenceNr(persistenceId, _) ⇒ + sender() ! highestSequenceNr(persistenceId) } } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala index 5b3616dbf5..392a074d85 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala @@ -17,11 +17,11 @@ import akka.persistence.PersistentRepr abstract class AsyncRecovery extends SAsyncReplay with AsyncRecoveryPlugin { this: Actor ⇒ import context.dispatcher - final def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit) = - doAsyncReplayMessages(processorId, fromSequenceNr, toSequenceNr, max, new Procedure[PersistentRepr] { + final def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit) = + doAsyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max, new Procedure[PersistentRepr] { def apply(p: PersistentRepr) = replayCallback(p) }).map(Unit.unbox) - final def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] = - doAsyncReadHighestSequenceNr(processorId, fromSequenceNr: Long).map(_.longValue) + final def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = + doAsyncReadHighestSequenceNr(persistenceId, fromSequenceNr: Long).map(_.longValue) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala index 9cdac531e2..e33ba02d2d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala @@ -22,9 +22,9 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w final def asyncWriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) = doAsyncWriteConfirmations(confirmations.asJava).map(Unit.unbox) - final def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) = + final def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) = doAsyncDeleteMessages(messageIds.asJava, permanent).map(Unit.unbox) - final def asyncDeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean) = - doAsyncDeleteMessagesTo(processorId, toSequenceNr, permanent).map(Unit.unbox) + final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = + doAsyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent).map(Unit.unbox) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala index 0f9e82e877..033eb0be7c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala @@ -20,9 +20,9 @@ abstract class SyncWriteJournal extends AsyncRecovery with SSyncWriteJournal wit final def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) = doWriteConfirmations(confirmations.asJava) - final def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) = + final def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) = doDeleteMessages(messageIds.asJava, permanent) - final def deleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean) = - doDeleteMessagesTo(processorId, toSequenceNr, permanent) + final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = + doDeleteMessagesTo(persistenceId, toSequenceNr, permanent) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala index 0a1f98217c..2756a11827 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala @@ -11,14 +11,14 @@ import java.nio.ByteBuffer * LevelDB key. */ private[leveldb] final case class Key( - processorId: Int, + persistenceId: Int, sequenceNr: Long, channelId: Int) private[leveldb] object Key { def keyToBytes(key: Key): Array[Byte] = { val bb = ByteBuffer.allocate(20) - bb.putInt(key.processorId) + bb.putInt(key.persistenceId) bb.putLong(key.sequenceNr) bb.putInt(key.channelId) bb.array @@ -32,15 +32,15 @@ private[leveldb] object Key { new Key(aid, snr, cid) } - def counterKey(processorId: Int): Key = Key(processorId, 0L, 0) + def counterKey(persistenceId: Int): Key = Key(persistenceId, 0L, 0) def counterToBytes(ctr: Long): Array[Byte] = ByteBuffer.allocate(8).putLong(ctr).array def counterFromBytes(bytes: Array[Byte]): Long = ByteBuffer.wrap(bytes).getLong def id(key: Key) = key.channelId def idKey(id: Int) = Key(1, 0L, id) - def isIdKey(key: Key): Boolean = key.processorId == 1 + def isIdKey(key: Key): Boolean = key.persistenceId == 1 - def deletionKey(processorId: Int, sequenceNr: Long): Key = Key(processorId, sequenceNr, 1) + def deletionKey(persistenceId: Int, sequenceNr: Long): Key = Key(persistenceId, sequenceNr, 1) def isDeletionKey(key: Key): Boolean = key.channelId == 1 } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala index 618fa3f65c..5f1acef194 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala @@ -22,17 +22,17 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb private lazy val replayDispatcherId = config.getString("replay-dispatcher") private lazy val replayDispatcher = context.system.dispatchers.lookup(replayDispatcherId) - def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] = { - val nid = numericId(processorId) + def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { + val nid = numericId(persistenceId) Future(readHighestSequenceNr(nid))(replayDispatcher) } - def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = { - val nid = numericId(processorId) + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = { + val nid = numericId(persistenceId) Future(replayMessages(nid, fromSequenceNr: Long, toSequenceNr, max: Long)(replayCallback))(replayDispatcher) } - def replayMessages(processorId: Int, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Unit = { + def replayMessages(persistenceId: Int, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Unit = { @scala.annotation.tailrec def go(iter: DBIterator, key: Key, ctr: Long, replayCallback: PersistentRepr ⇒ Unit) { if (iter.hasNext) { @@ -43,7 +43,7 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb } else if (nextKey.channelId != 0) { // phantom confirmation (just advance iterator) go(iter, nextKey, ctr, replayCallback) - } else if (key.processorId == nextKey.processorId) { + } else if (key.persistenceId == nextKey.persistenceId) { val msg = persistentFromBytes(nextEntry.getValue) val del = deletion(iter, nextKey) val cnf = confirms(iter, nextKey, Nil) @@ -60,7 +60,7 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb if (iter.hasNext) { val nextEntry = iter.peekNext() val nextKey = keyFromBytes(nextEntry.getKey) - if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr) { + if (key.persistenceId == nextKey.persistenceId && key.sequenceNr == nextKey.sequenceNr) { val nextValue = new String(nextEntry.getValue, "UTF-8") iter.next() confirms(iter, nextKey, nextValue :: channelIds) @@ -72,7 +72,7 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb if (iter.hasNext) { val nextEntry = iter.peekNext() val nextKey = keyFromBytes(nextEntry.getKey) - if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr && isDeletionKey(nextKey)) { + if (key.persistenceId == nextKey.persistenceId && key.sequenceNr == nextKey.sequenceNr && isDeletionKey(nextKey)) { iter.next() true } else false @@ -80,16 +80,16 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb } withIterator { iter ⇒ - val startKey = Key(processorId, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0) + val startKey = Key(persistenceId, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0) iter.seek(keyToBytes(startKey)) go(iter, startKey, 0L, replayCallback) } } - def readHighestSequenceNr(processorId: Int) = { + def readHighestSequenceNr(persistenceId: Int) = { val ro = leveldbSnapshot() try { - leveldb.get(keyToBytes(counterKey(processorId)), ro) match { + leveldb.get(keyToBytes(counterKey(persistenceId)), ro) match { case null ⇒ 0L case bytes ⇒ counterFromBytes(bytes) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index 00d5816f24..bc99a34324 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -50,15 +50,15 @@ private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) = withBatch(batch ⇒ confirmations.foreach(confirmation ⇒ addToConfirmationBatch(confirmation, batch))) - def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) = withBatch { batch ⇒ + def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) = withBatch { batch ⇒ messageIds foreach { messageId ⇒ - if (permanent) batch.delete(keyToBytes(Key(numericId(messageId.processorId), messageId.sequenceNr, 0))) - else batch.put(keyToBytes(deletionKey(numericId(messageId.processorId), messageId.sequenceNr)), Array.emptyByteArray) + if (permanent) batch.delete(keyToBytes(Key(numericId(messageId.persistenceId), messageId.sequenceNr, 0))) + else batch.put(keyToBytes(deletionKey(numericId(messageId.persistenceId), messageId.sequenceNr)), Array.emptyByteArray) } } - def deleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean) = withBatch { batch ⇒ - val nid = numericId(processorId) + def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = withBatch { batch ⇒ + val nid = numericId(persistenceId) // seek to first existing message val fromSequenceNr = withIterator { iter ⇒ @@ -101,13 +101,13 @@ private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with def persistentFromBytes(a: Array[Byte]): PersistentRepr = serialization.deserialize(a, classOf[PersistentRepr]).get private def addToMessageBatch(persistent: PersistentRepr, batch: WriteBatch): Unit = { - val nid = numericId(persistent.processorId) + val nid = numericId(persistent.persistenceId) batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr)) batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent)) } private def addToConfirmationBatch(confirmation: PersistentConfirmation, batch: WriteBatch): Unit = { - val npid = numericId(confirmation.processorId) + val npid = numericId(confirmation.persistenceId) val ncid = numericId(confirmation.channelId) batch.put(keyToBytes(Key(npid, confirmation.sequenceNr, ncid)), confirmation.channelId.getBytes("UTF-8")) } diff --git a/akka-persistence/src/main/scala/akka/persistence/package.scala b/akka-persistence/src/main/scala/akka/persistence/package.scala index e43e131973..3226941d76 100644 --- a/akka-persistence/src/main/scala/akka/persistence/package.scala +++ b/akka-persistence/src/main/scala/akka/persistence/package.scala @@ -8,7 +8,7 @@ package akka package object persistence { implicit val snapshotMetadataOrdering = new Ordering[SnapshotMetadata] { def compare(x: SnapshotMetadata, y: SnapshotMetadata) = - if (x.processorId == y.processorId) math.signum(x.sequenceNr - y.sequenceNr).toInt - else x.processorId.compareTo(y.processorId) + if (x.persistenceId == y.persistenceId) math.signum(x.sequenceNr - y.sequenceNr).toInt + else x.persistenceId.compareTo(y.persistenceId) } } diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 5b50b7e367..e8f978090e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -30,7 +30,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { val PersistentImplClass = classOf[PersistentImpl] val ConfirmablePersistentImplClass = classOf[ConfirmablePersistentImpl] val DeliveredByTransientChannelClass = classOf[DeliveredByChannel] - val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistentChannel] + val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistenceChannel] val DeliverClass = classOf[Deliver] def identifier: Int = 7 @@ -47,12 +47,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { * serialization of a persistent message's payload to a matching `akka.serialization.Serializer`. */ def toBinary(o: AnyRef): Array[Byte] = o match { - case b: PersistentBatch ⇒ persistentMessageBatchBuilder(b).build().toByteArray - case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray - case c: DeliveredByChannel ⇒ deliveredMessageBuilder(c).build().toByteArray - case c: DeliveredByPersistentChannel ⇒ deliveredMessageBuilder(c).build().toByteArray - case d: Deliver ⇒ deliverMessageBuilder(d).build.toByteArray - case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") + case b: PersistentBatch ⇒ persistentMessageBatchBuilder(b).build().toByteArray + case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray + case c: DeliveredByChannel ⇒ deliveredMessageBuilder(c).build().toByteArray + case c: DeliveredByPersistenceChannel ⇒ deliveredMessageBuilder(c).build().toByteArray + case d: Deliver ⇒ deliverMessageBuilder(d).build.toByteArray + case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") } /** @@ -95,7 +95,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { private def persistentMessageBuilder(persistent: PersistentRepr) = { val builder = PersistentMessage.newBuilder - if (persistent.processorId != Undefined) builder.setProcessorId(persistent.processorId) + if (persistent.persistenceId != Undefined) builder.setPersistenceId(persistent.persistenceId) if (persistent.confirmMessage != null) builder.setConfirmMessage(deliveredMessageBuilder(persistent.confirmMessage)) if (persistent.confirmTarget != null) builder.setConfirmTarget(Serialization.serializedActorPath(persistent.confirmTarget)) if (persistent.sender != null) builder.setSender(Serialization.serializedActorPath(persistent.sender)) @@ -115,7 +115,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { val serializer = SerializationExtension(system).findSerializerFor(payload) val builder = PersistentPayload.newBuilder() - if (serializer.includeManifest) builder.setPayloadManifest((ByteString.copyFromUtf8(payload.getClass.getName))) + if (serializer.includeManifest) builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName)) builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload))) builder.setSerializerId(serializer.identifier) @@ -139,7 +139,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { builder.setDeliverySequenceNr(delivered.deliverySequenceNr) delivered match { - case c: DeliveredByChannel ⇒ builder.setProcessorId(c.processorId) + case c: DeliveredByChannel ⇒ builder.setPersistenceId(c.persistenceId) case _ ⇒ builder } } @@ -161,7 +161,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { PersistentRepr( payload(persistentMessage.getPayload), persistentMessage.getSequenceNr, - if (persistentMessage.hasProcessorId) persistentMessage.getProcessorId else Undefined, + if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined, persistentMessage.getDeleted, persistentMessage.getRedeliveries, immutableSeq(persistentMessage.getConfirmsList), @@ -184,15 +184,15 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { private def delivered(deliveredMessage: DeliveredMessage): Delivered = { val channel = if (deliveredMessage.hasChannel) system.provider.resolveActorRef(deliveredMessage.getChannel) else null - if (deliveredMessage.hasProcessorId) { + if (deliveredMessage.hasPersistenceId) { DeliveredByChannel( - deliveredMessage.getProcessorId, + deliveredMessage.getPersistenceId, deliveredMessage.getChannelId, deliveredMessage.getPersistentSequenceNr, deliveredMessage.getDeliverySequenceNr, channel) } else { - DeliveredByPersistentChannel( + DeliveredByPersistenceChannel( deliveredMessage.getChannelId, deliveredMessage.getPersistentSequenceNr, deliveredMessage.getDeliverySequenceNr, diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index 297158ac4f..3203fc0068 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -22,13 +22,13 @@ trait SnapshotStore extends Actor { private val publish = extension.settings.internal.publishPluginCommands final def receive = { - case LoadSnapshot(processorId, criteria, toSequenceNr) ⇒ + case LoadSnapshot(persistenceId, criteria, toSequenceNr) ⇒ val p = sender() - loadAsync(processorId, criteria.limit(toSequenceNr)) map { + loadAsync(persistenceId, criteria.limit(toSequenceNr)) map { sso ⇒ LoadSnapshotResult(sso, toSequenceNr) } recover { case e ⇒ LoadSnapshotResult(None, toSequenceNr) - } pipeTo (p) + } pipeTo p case SaveSnapshot(metadata, snapshot) ⇒ val p = sender() val md = metadata.copy(timestamp = System.currentTimeMillis) @@ -58,7 +58,7 @@ trait SnapshotStore extends Actor { * @param processorId processor id. * @param criteria selection criteria for loading. */ - def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] + def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] /** * Plugin API: asynchronously saves a snapshot. @@ -86,9 +86,9 @@ trait SnapshotStore extends Actor { /** * Plugin API: deletes all snapshots matching `criteria`. * - * @param processorId processor id. + * @param persistenceId processor id. * @param criteria selection criteria for deleting. */ - def delete(processorId: String, criteria: SnapshotSelectionCriteria) + def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala index e00c2038db..1d693eec61 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala @@ -16,8 +16,8 @@ import akka.persistence.snapshot.{ SnapshotStore ⇒ SSnapshotStore } abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { import context.dispatcher - final def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria) = - doLoadAsync(processorId, criteria).map(_.asScala) + final def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria) = + doLoadAsync(persistenceId, criteria).map(_.asScala) final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = doSaveAsync(metadata, snapshot).map(Unit.unbox) @@ -28,7 +28,7 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { final def delete(metadata: SnapshotMetadata) = doDelete(metadata) - final def delete(processorId: String, criteria: SnapshotSelectionCriteria) = - doDelete(processorId: String, criteria: SnapshotSelectionCriteria) + final def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = + doDelete(persistenceId: String, criteria: SnapshotSelectionCriteria) } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index e43bd6b17a..40a41c20e5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -33,7 +33,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo private val serializationExtension = SerializationExtension(context.system) private var saving = immutable.Set.empty[SnapshotMetadata] // saving in progress - def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { + def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { // // Heuristics: // @@ -44,7 +44,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo // // TODO: make number of loading attempts configurable // - val metadata = snapshotMetadata(processorId, criteria).sorted.takeRight(3) + val metadata = snapshotMetadata(persistenceId, criteria).sorted.takeRight(3) Future(load(metadata))(streamDispatcher) } @@ -62,8 +62,8 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo snapshotFile(metadata).delete() } - def delete(processorId: String, criteria: SnapshotSelectionCriteria) = { - snapshotMetadata(processorId, criteria).foreach(delete) + def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = { + snapshotMetadata(persistenceId, criteria).foreach(delete) } @scala.annotation.tailrec @@ -102,10 +102,10 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo try { p(stream) } finally { stream.close() } private def snapshotFile(metadata: SnapshotMetadata, extension: String = ""): File = - new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.processorId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}${extension}") + new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}${extension}") - private def snapshotMetadata(processorId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = - snapshotDir.listFiles(new SnapshotFilenameFilter(processorId)).map(_.getName).collect { + private def snapshotMetadata(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = + snapshotDir.listFiles(new SnapshotFilenameFilter(persistenceId)).map(_.getName).collect { case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong) }.filter(md ⇒ criteria.matches(md) && !saving.contains(md)).toVector @@ -119,7 +119,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo super.preStart() } - private class SnapshotFilenameFilter(processorId: String) extends FilenameFilter { - def accept(dir: File, name: String): Boolean = name.startsWith(s"snapshot-${URLEncoder.encode(processorId, "UTF-8")}") + private class SnapshotFilenameFilter(persistenceId: String) extends FilenameFilter { + def accept(dir: File, name: String): Boolean = name.startsWith(s"snapshot-${URLEncoder.encode(persistenceId, "UTF-8")}") } } diff --git a/akka-persistence/src/test/scala/akka/persistence/FailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/FailureSpec.scala index 40fea2a963..26053d1a4c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/FailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/FailureSpec.scala @@ -61,7 +61,7 @@ object FailureSpec { val channel = context.actorOf(Channel.props("channel", ChannelSettings(redeliverMax = 10, redeliverInterval = 500 milliseconds)), "channel") - override def processorId = "chaos" + override def persistenceId = "chaos" def receive = { case p @ Persistent(i: Int, _) ⇒ diff --git a/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala index 22e725c183..06ad3a9ab2 100644 --- a/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala @@ -22,7 +22,6 @@ object NumberProcessorSpec { case object GetNumber class NumberProcessorWithPersistentChannel(name: String) extends NamedProcessor(name) { - override def processorId = name var num = 0 val channel = context.actorOf(PersistentChannel.props(channelId = "stable_id", @@ -52,7 +51,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu "resurrect with the correct state, not replaying confirmed messages to clients" in { val deliveredProbe = TestProbe() - system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistentChannel]) + system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistenceChannel]) val probe = TestProbe() @@ -63,7 +62,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu zero.confirm() zero.payload should equal(0) - deliveredProbe.expectMsgType[DeliveredByPersistentChannel] + deliveredProbe.expectMsgType[DeliveredByPersistenceChannel] processor.tell(Persistent(DecrementAndGet), probe.testActor) @@ -71,7 +70,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu decrementFrom0.confirm() decrementFrom0.payload should equal(-1) - deliveredProbe.expectMsgType[DeliveredByPersistentChannel] + deliveredProbe.expectMsgType[DeliveredByPersistenceChannel] watch(processor) system.stop(processor) @@ -84,7 +83,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu decrementFromMinus1.confirm() decrementFromMinus1.payload should equal(-2) - deliveredProbe.expectMsgType[DeliveredByPersistentChannel] + deliveredProbe.expectMsgType[DeliveredByPersistenceChannel] } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index d1dc7e1c35..86484691e7 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -30,8 +30,8 @@ object PerformanceSpec { var startTime: Long = 0L var stopTime: Long = 0L - var startSequenceNr = 0L; - var stopSequenceNr = 0L; + var startSequenceNr = 0L + var stopSequenceNr = 0L def startMeasure(): Unit = { startSequenceNr = lastSequenceNr @@ -169,9 +169,9 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor def stressPersistentChannel(): Unit = { val channel = system.actorOf(PersistentChannel.props()) val destination = system.actorOf(Props[PerformanceTestDestination]) - 1 to warmupCycles foreach { i ⇒ channel ! Deliver(PersistentRepr(s"msg${i}", processorId = "test"), destination.path) } + 1 to warmupCycles foreach { i ⇒ channel ! Deliver(PersistentRepr(s"msg${i}", persistenceId = "test"), destination.path) } channel ! Deliver(Persistent(StartMeasure), destination.path) - 1 to loadCycles foreach { i ⇒ channel ! Deliver(PersistentRepr(s"msg${i}", processorId = "test"), destination.path) } + 1 to loadCycles foreach { i ⇒ channel ! Deliver(PersistentRepr(s"msg${i}", persistenceId = "test"), destination.path) } channel ! Deliver(Persistent(StopMeasure), destination.path) expectMsgPF(100 seconds) { case throughput: Double ⇒ println(f"\nthroughput = $throughput%.2f persistent messages per second") @@ -179,10 +179,10 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor } def subscribeToConfirmation(probe: TestProbe): Unit = - system.eventStream.subscribe(probe.ref, classOf[DeliveredByPersistentChannel]) + system.eventStream.subscribe(probe.ref, classOf[DeliveredByPersistenceChannel]) def awaitConfirmation(probe: TestProbe): Unit = - probe.expectMsgType[DeliveredByPersistentChannel] + probe.expectMsgType[DeliveredByPersistenceChannel] "A command sourced processor" should { "have some reasonable throughput" in { @@ -213,7 +213,7 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor stressPersistentChannel() probe.fishForMessage(100.seconds) { - case DeliveredByPersistentChannel(_, snr, _, _) ⇒ snr == warmupCycles + loadCycles + 2 + case DeliveredByPersistenceChannel(_, snr, _, _) ⇒ snr == warmupCycles + loadCycles + 2 } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 66e32c8707..c4ec7a2449 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -77,7 +77,7 @@ trait Cleanup { this: AkkaSpec ⇒ } abstract class NamedProcessor(name: String) extends Processor { - override def processorId: String = name + override def persistenceId: String = name } trait TurnOffRecoverOnStart { this: Processor ⇒ diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala index e4e081b775..bc0e273f37 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala @@ -71,8 +71,8 @@ abstract class PersistentChannelSpec(config: Config) extends ChannelSpec(config) "not modify certain persistent message fields" in { val destProbe = TestProbe() - val persistent1 = PersistentRepr(payload = "a", processorId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel, sequenceNr = 13) - val persistent2 = PersistentRepr(payload = "b", processorId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel) + val persistent1 = PersistentRepr(payload = "a", persistenceId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel, sequenceNr = 13) + val persistent2 = PersistentRepr(payload = "b", persistenceId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel) defaultTestChannel ! Deliver(persistent1, destProbe.ref.path) defaultTestChannel ! Deliver(persistent2, destProbe.ref.path) diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala index de60fc8dfa..8b4863e2c5 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala @@ -13,7 +13,7 @@ object SnapshotDirectoryFailureSpec { class TestProcessor(name: String, probe: ActorRef) extends Processor { - override def processorId: String = name + override def persistenceId: String = name override def preStart(): Unit = () diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index 56b69afbd8..0746352130 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -53,7 +53,7 @@ class SnapshotFailureRobustnessSpec extends AkkaSpec(PersistenceSpec.config("lev "A processor with a failing snapshot" must { "recover state starting from the most recent complete snapshot" in { val sProcessor = system.actorOf(Props(classOf[SaveSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name expectMsg(RecoveryCompleted) sProcessor ! Persistent("blahonga") @@ -67,7 +67,7 @@ class SnapshotFailureRobustnessSpec extends AkkaSpec(PersistenceSpec.config("lev val lProcessor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) lProcessor ! Recover() expectMsgPF() { - case (SnapshotMetadata(`processorId`, 1, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 1, timestamp), state) ⇒ state should be("blahonga") timestamp should be > (0L) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala index 13beeb8d0e..f776da7490 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala @@ -77,14 +77,14 @@ class SnapshotSerializationSpec extends AkkaSpec(PersistenceSpec.config("leveldb "A processor with custom Serializer" must { "be able to handle serialization header of more than 255 bytes" in { val sProcessor = system.actorOf(Props(classOf[TestProcessor], name, testActor)) - val processorId = name + val persistenceId = name sProcessor ! "blahonga" expectMsg(0) val lProcessor = system.actorOf(Props(classOf[TestProcessor], name, testActor)) lProcessor ! Recover() expectMsgPF() { - case (SnapshotMetadata(`processorId`, 0, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 0, timestamp), state) ⇒ state should be(new MySnapshot("blahonga")) timestamp should be > (0L) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index 5176f60cd0..2df4db483f 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -64,12 +64,12 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS "A processor" must { "recover state starting from the most recent snapshot" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name processor ! Recover() expectMsgPF() { - case (SnapshotMetadata(`processorId`, 4, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp should be > (0L) } @@ -79,12 +79,12 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS } "recover state starting from the most recent snapshot matching an upper sequence number bound" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name processor ! Recover(toSequenceNr = 3) expectMsgPF() { - case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should be(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -93,13 +93,13 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS } "recover state starting from the most recent snapshot matching an upper sequence number bound (without further replay)" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name processor ! Recover(toSequenceNr = 4) processor ! "done" expectMsgPF() { - case (SnapshotMetadata(`processorId`, 4, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp should be > (0L) } @@ -108,12 +108,12 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS } "recover state starting from the most recent snapshot matching criteria" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2)) expectMsgPF() { - case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should be(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -125,12 +125,12 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS } "recover state starting from the most recent snapshot matching criteria and an upper sequence number bound" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3) expectMsgPF() { - case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should be(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -151,7 +151,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS val deleteProbe = TestProbe() val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshot]) @@ -160,7 +160,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS processor1 ! "done" val metadata = expectMsgPF() { - case (md @ SnapshotMetadata(`processorId`, 4, _), state) ⇒ + case (md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) md } @@ -175,7 +175,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS processor2 ! Recover(toSequenceNr = 4) expectMsgPF() { - case (md @ SnapshotMetadata(`processorId`, 2, _), state) ⇒ + case (md @ SnapshotMetadata(`persistenceId`, 2, _), state) ⇒ state should be(List("a-1", "b-2").reverse) md } @@ -187,7 +187,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS val deleteProbe = TestProbe() val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshots]) @@ -195,7 +195,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS processor1 ! Recover(toSequenceNr = 4) processor1 ! DeleteN(SnapshotSelectionCriteria(maxSequenceNr = 4)) expectMsgPF() { - case (md @ SnapshotMetadata(`processorId`, 4, _), state) ⇒ + case (md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) } expectMsg(RecoveryCompleted) diff --git a/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala index e1ead270eb..2fc01670ef 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala @@ -27,7 +27,7 @@ object ViewSpec { this(name, probe, 100.milliseconds) override def autoUpdateInterval: FiniteDuration = interval.dilated(context.system) - override val processorId: String = name + override val persistenceId: String = name var last: String = _ @@ -50,7 +50,7 @@ object ViewSpec { } class PassiveTestView(name: String, probe: ActorRef, var failAt: Option[String]) extends View { - override val processorId: String = name + override val persistenceId: String = name override def autoUpdate: Boolean = false override def autoUpdateReplayMax: Long = 0L // no message replay during initial recovery @@ -70,10 +70,11 @@ object ViewSpec { super.postRestart(reason) failAt = None } + } class ActiveTestView(name: String, probe: ActorRef) extends View { - override val processorId: String = name + override val persistenceId: String = name override def autoUpdateInterval: FiniteDuration = 50.millis override def autoUpdateReplayMax: Long = 2 @@ -92,8 +93,8 @@ object ViewSpec { } class EmittingView(name: String, destination: ActorRef) extends View { + override val persistenceId: String = name override def autoUpdateInterval: FiniteDuration = 100.milliseconds.dilated(context.system) - override val processorId: String = name val channel = context.actorOf(Channel.props(s"${name}-channel")) @@ -106,10 +107,11 @@ object ViewSpec { } class SnapshottingView(name: String, probe: ActorRef) extends View { - override def autoUpdateInterval: FiniteDuration = 100.microseconds.dilated(context.system) - override val processorId: String = name + override val persistenceId: String = name override val viewId: String = s"${name}-replicator" + override def autoUpdateInterval: FiniteDuration = 100.microseconds.dilated(context.system) + var last: String = _ def receive = { diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala index f7b45a3907..e41ca93fdb 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala @@ -16,7 +16,7 @@ class WriteFailedException(ps: Seq[PersistentRepr]) extends TestException(s"write failed for payloads = [${ps.map(_.payload)}]") class ConfirmFailedException(cs: Seq[PersistentConfirmation]) - extends TestException(s"write failed for confirmations = [${cs.map(c ⇒ s"${c.processorId}-${c.sequenceNr}-${c.channelId}")}]") + extends TestException(s"write failed for confirmations = [${cs.map(c ⇒ s"${c.persistenceId}-${c.sequenceNr}-${c.channelId}")}]") class ReplayFailedException(ps: Seq[PersistentRepr]) extends TestException(s"recovery failed after replaying payloads = [${ps.map(_.payload)}]") @@ -24,7 +24,7 @@ class ReplayFailedException(ps: Seq[PersistentRepr]) class ReadHighestFailedException extends TestException(s"recovery failed when reading highest sequence number") -class DeleteFailedException(messageIds: immutable.Seq[PersistentId]) +class DeleteFailedException(messageIds: immutable.Seq[PersistenceId]) extends TestException(s"delete failed for message ids = [${messageIds}]") /** @@ -51,30 +51,30 @@ class ChaosJournal extends SyncWriteJournal { def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]): Unit = if (shouldFail(confirmFailureRate)) throw new ConfirmFailedException(confirmations) - else confirmations.foreach(cnf ⇒ update(cnf.processorId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms))) + else confirmations.foreach(cnf ⇒ update(cnf.persistenceId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms))) - def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Unit = + def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Unit = if (shouldFail(deleteFailureRate)) throw new DeleteFailedException(messageIds) - else if (permanent) messageIds.foreach(mid ⇒ update(mid.processorId, mid.sequenceNr)(_.update(deleted = true))) - else messageIds.foreach(mid ⇒ del(mid.processorId, mid.sequenceNr)) + else if (permanent) messageIds.foreach(mid ⇒ update(mid.persistenceId, mid.sequenceNr)(_.update(deleted = true))) + else messageIds.foreach(mid ⇒ del(mid.persistenceId, mid.sequenceNr)) - def deleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Unit = - (1L to toSequenceNr).map(PersistentIdImpl(processorId, _)) + def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = + (1L to toSequenceNr).map(PersistenceIdImpl(persistenceId, _)) - def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Unit] = + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Unit] = if (shouldFail(replayFailureRate)) { - val rm = read(processorId, fromSequenceNr, toSequenceNr, max) + val rm = read(persistenceId, fromSequenceNr, toSequenceNr, max) val sm = rm.take(random.nextInt(rm.length + 1)) sm.foreach(replayCallback) Future.failed(new ReplayFailedException(sm)) } else { - read(processorId, fromSequenceNr, toSequenceNr, max).foreach(replayCallback) + read(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(replayCallback) Future.successful(()) } - def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] = + def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = if (shouldFail(readHighestFailureRate)) Future.failed(new ReadHighestFailedException) - else Future.successful(highestSequenceNr(processorId)) + else Future.successful(highestSequenceNr(persistenceId)) def shouldFail(rate: Double): Boolean = random.nextDouble() < rate diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 91f5797f45..61ce089fcf 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -124,11 +124,11 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { deserialized should be(confirmation) } "handle DeliveredByPersistentChannel message serialization" in { - val confirmation = DeliveredByPersistentChannel("c2", 14) + val confirmation = DeliveredByPersistenceChannel("c2", 14) val serializer = serialization.findSerializerFor(confirmation) val bytes = serializer.toBinary(confirmation) - val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByPersistentChannel])) + val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByPersistenceChannel])) deserialized should be(confirmation) } @@ -149,7 +149,7 @@ object MessageSerializerRemotingSpec { case ConfirmablePersistent(MyPayload(data), _, _) ⇒ sender() ! s"c${data}" case Persistent(MyPayload(data), _) ⇒ sender() ! s"p${data}" case DeliveredByChannel(pid, cid, msnr, dsnr, ep) ⇒ sender() ! s"${pid},${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}" - case DeliveredByPersistentChannel(cid, msnr, dsnr, ep) ⇒ sender() ! s"${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}" + case DeliveredByPersistenceChannel(cid, msnr, dsnr, ep) ⇒ sender() ! s"${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}" case Deliver(Persistent(payload, _), dp) ⇒ context.actorSelection(dp) ! payload } } @@ -194,7 +194,7 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS expectMsg("a,b,2,3,true") } "serialize DeliveredByPersistentChannel messages during remoting" in { - localActor ! DeliveredByPersistentChannel("c", 2, 3, testActor) + localActor ! DeliveredByPersistenceChannel("c", 2, 3, testActor) expectMsg("c,2,3,true") } "serialize Deliver messages during remoting" in { diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index a943427f2f..09e6753e24 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -24,9 +24,9 @@ public class LambdaPersistenceDocTest { public interface SomeOtherMessage {} public interface ProcessorMethods { - //#processor-id - public String processorId(); - //#processor-id + //#persistence-id + public String persistenceId(); + //#persistence-id //#recovery-status public boolean recoveryRunning(); public boolean recoveryFinished(); @@ -121,13 +121,13 @@ public class LambdaPersistenceDocTest { } class MyProcessor4 extends AbstractProcessor implements ProcessorMethods { - //#processor-id-override + //#persistence-id-override @Override - public String processorId() { - return "my-stable-processor-id"; + public String persistenceId() { + return "my-stable-persistence-id"; } - //#processor-id-override + //#persistence-id-override public MyProcessor4() { receive(ReceiveBuilder. match(Persistent.class, received -> {/* ... */}).build() @@ -494,8 +494,8 @@ public class LambdaPersistenceDocTest { //#view class MyView extends AbstractView { @Override - public String processorId() { - return "some-processor-id"; + public String persistenceId() { + return "some-persistence-id"; } public MyView() { diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java index 3999650804..d3cd3998c8 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java @@ -6,7 +6,6 @@ package doc; //#plugin-imports import akka.japi.pf.ReceiveBuilder; -import scala.PartialFunction; import scala.concurrent.Future; import akka.japi.Option; import akka.japi.Procedure; @@ -17,7 +16,6 @@ import akka.persistence.snapshot.japi.*; import akka.actor.*; import akka.persistence.journal.leveldb.SharedLeveldbJournal; import akka.persistence.journal.leveldb.SharedLeveldbStore; -import scala.runtime.BoxedUnit; public class LambdaPersistencePluginDocTest { @@ -55,7 +53,7 @@ public class LambdaPersistencePluginDocTest { class MySnapshotStore extends SnapshotStore { @Override - public Future> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria) { + public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { return null; } @@ -73,7 +71,7 @@ public class LambdaPersistencePluginDocTest { } @Override - public void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception { + public void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { } } @@ -89,17 +87,17 @@ public class LambdaPersistencePluginDocTest { } @Override - public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { + public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { return null; } @Override - public Future doAsyncDeleteMessagesTo(String processorId, long toSequenceNr, boolean permanent) { + public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { return null; } @Override - public Future doAsyncReplayMessages(String processorId, long fromSequenceNr, + public Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, Procedure replayCallback) { @@ -107,7 +105,7 @@ public class LambdaPersistencePluginDocTest { } @Override - public Future doAsyncReadHighestSequenceNr(String processorId, long fromSequenceNr) { + public Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) { return null; } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java index 7344f5efb5..2fe074111e 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java @@ -45,7 +45,7 @@ public class ViewExample { } @Override - public String processorId() { + public String persistenceId() { return "processor-5"; } diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java index c231b3699a..b132a5d652 100644 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java @@ -35,7 +35,7 @@ public class ViewExample { } @Override - public String processorId() { + public String persistenceId() { return "processor-5"; } diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala index 4699a7fbbc..edaaec1ac5 100644 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala @@ -18,7 +18,7 @@ object ViewExample extends App { class ExampleView extends View { private var numReplicated = 0 - override def processorId = "processor-5" + override def persistenceId: String = "processor-5" override def viewId = "view-5" private val destination = context.actorOf(Props[ExampleDestination]) @@ -35,6 +35,7 @@ object ViewExample extends App { println(s"view received ${payload} (sequence nr = ${sequenceNr}, num replicated = ${numReplicated})") channel ! Deliver(Persistent(s"replicated-${payload}"), destination.path) } + } class ExampleDestination extends Actor {