diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 4f70f830fc..60e1376338 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -22,9 +22,9 @@ public class PersistenceDocTest { public interface SomeOtherMessage {} public interface ProcessorMethods { - //#processor-id - public String processorId(); - //#processor-id + //#persistence-id + public String persistenceId(); + //#persistence-id //#recovery-status public boolean recoveryRunning(); public boolean recoveryFinished(); @@ -120,12 +120,12 @@ public class PersistenceDocTest { } class MyProcessor4 extends UntypedProcessor implements ProcessorMethods { - //#processor-id-override + //#persistence-id-override @Override - public String processorId() { - return "my-stable-processor-id"; + public String persistenceId() { + return "my-stable-persistence-id"; } - //#processor-id-override + //#persistence-id-override @Override public void onReceive(Object message) throws Exception {} } @@ -488,8 +488,8 @@ public class PersistenceDocTest { //#view class MyView extends UntypedView { @Override - public String processorId() { - return "some-processor-id"; + public String persistenceId() { + return "some-persistence-id"; } @Override diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index b40527ec3e..82dc3e9c42 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -53,7 +53,7 @@ public class PersistencePluginDocTest { class MySnapshotStore extends SnapshotStore { @Override - public Future> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria) { + public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { return null; } @@ -71,7 +71,7 @@ public class PersistencePluginDocTest { } @Override - public void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception { + public void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { } } @@ -87,22 +87,22 @@ public class PersistencePluginDocTest { } @Override - public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { + public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { return null; } @Override - public Future doAsyncDeleteMessagesTo(String processorId, long toSequenceNr, boolean permanent) { + public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { return null; } @Override - public Future doAsyncReplayMessages(String processorId, long fromSequenceNr, long toSequenceNr, long max, Procedure replayCallback) { + public Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, Procedure replayCallback) { return null; } @Override - public Future doAsyncReadHighestSequenceNr(String processorId, long fromSequenceNr) { + public Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) { return null; } } diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 4745b763e1..a8c77b5e98 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -161,30 +161,30 @@ Identifiers ----------- A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of processor's path without the address part and can be obtained via the ``processorId`` +``String`` representation of processor's path without the address part and can be obtained via the ``persistenceId`` method. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistence-id Applications can customize a processor's id by specifying an actor name during processor creation as shown in section :ref:`processors-java`. This changes that processor's name in its actor hierarchy and hence influences only -part of the processor id. To fully customize a processor's id, the ``processorId`` method must be overridden. +part of the processor id. To fully customize a processor's id, the ``persistenceId`` method must be overridden. -.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id-override +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#persistence-id-override -Overriding ``processorId`` is the recommended way to generate stable identifiers. +Overriding ``persistenceId`` is the recommended way to generate stable identifiers. .. _views-java-lambda: Views ===== -Views can be implemented by extending the ``AbstractView`` abstract class, implement the ``processorId`` method +Views can be implemented by extending the ``AbstractView`` abstract class, implement the ``persistenceId`` method and setting the “initial behavior” in the constructor by calling the :meth:`receive` method. .. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#view -The ``processorId`` identifies the processor from which the view receives journaled messages. It is not necessary +The ``persistenceId`` identifies the processor from which the view receives journaled messages. It is not necessary the referenced processor is actually running. Views read messages from a processor's journal directly. When a processor is started later and begins to write new messages, the corresponding view is updated automatically, by default. @@ -234,7 +234,7 @@ Applications can customize a view's id by specifying an actor name during view c name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the ``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. -The ``viewId`` must differ from the referenced ``processorId``, unless :ref:`snapshots-java` of a view and its +The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its processor shall be shared (which is what applications usually do not want). .. _channels-java-lambda: diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 804b1b1aae..b51a8c844b 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -175,30 +175,30 @@ Identifiers ----------- A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of processor's path without the address part and can be obtained via the ``processorId`` +``String`` representation of processor's path without the address part and can be obtained via the ``persistenceId`` method. -.. includecode:: code/docs/persistence/PersistenceDocTest.java#processor-id +.. includecode:: code/docs/persistence/PersistenceDocTest.java#persistence-id Applications can customize a processor's id by specifying an actor name during processor creation as shown in section :ref:`processors-java`. This changes that processor's name in its actor hierarchy and hence influences only -part of the processor id. To fully customize a processor's id, the ``processorId`` method must be overridden. +part of the processor id. To fully customize a processor's id, the ``persistenceId`` method must be overridden. -.. includecode:: code/docs/persistence/PersistenceDocTest.java#processor-id-override +.. includecode:: code/docs/persistence/PersistenceDocTest.java#persistence-id-override -Overriding ``processorId`` is the recommended way to generate stable identifiers. +Overriding ``persistenceId`` is the recommended way to generate stable identifiers. .. _views-java: Views ===== -Views can be implemented by extending the ``UntypedView`` trait and implementing the ``onReceive`` and the ``processorId`` +Views can be implemented by extending the ``UntypedView`` trait and implementing the ``onReceive`` and the ``persistenceId`` methods. .. includecode:: code/docs/persistence/PersistenceDocTest.java#view -The ``processorId`` identifies the processor from which the view receives journaled messages. It is not necessary +The ``persistenceId`` identifies the processor from which the view receives journaled messages. It is not necessary the referenced processor is actually running. Views read messages from a processor's journal directly. When a processor is started later and begins to write new messages, the corresponding view is updated automatically, by default. @@ -248,7 +248,7 @@ Applications can customize a view's id by specifying an actor name during view c name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the ``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. -The ``viewId`` must differ from the referenced ``processorId``, unless :ref:`snapshots-java` of a view and its +The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots-java` of a view and its processor shall be shared (which is what applications usually do not want). .. _channels-java: diff --git a/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst b/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst index 879c73e13b..d7a97a6d72 100644 --- a/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst +++ b/akka-docs/rst/project/migration-guide-eventsourced-2.3.x.rst @@ -84,7 +84,7 @@ Processors channel in order to avoid redundant replies during replay. Sender references of type ``PromiseActorRef`` are not journaled, they are ``system.deadLetters`` on replay. - Supports :ref:`snapshots`. -- :ref:`processor-identifiers` are of type ``String``, have a default value and can be overridden by applications. +- :ref:`persistence-identifiers` are of type ``String``, have a default value and can be overridden by applications. - Supports :ref:`batch-writes`. - Supports stashing of messages. diff --git a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst index 937a378daa..ee0b86e1e7 100644 --- a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst @@ -21,7 +21,42 @@ To extend ``PersistentActor``:: class NewPersistentProcessor extends PersistentActor { /*...*/ } -No other API changes are required for this migration. + +Renamed processorId to persistenceId +==================================== +In Akka Persistence ``2.3.3`` and previously, the main building block of applications were Processors. +Persistent messages, as well as processors implemented the ``processorId`` method to identify which persistent entity a message belonged to. + +This concept remains the same in Akka ``2.3.4``, yet we rename ``processorId`` to ``persistenceId`` because Processors will be removed, +and persistent messages can be used from different classes not only ``PersistentActor`` (Views, directly from Journals etc). + +We provided the renamed method also on already deprecated classes (Channels), so you can simply apply a global rename of ``processorId`` to ``persistenceId``. + +Plugin APIs: Renamed PersistentId to PersistenceId +================================================== +Following the removal of Processors and moving to ``persistenceId``, the plugin SPI visible type has changed. +The move from ``2.3.3`` to ``2.3.4`` should be relatively painless, and plugins will work even when using the deprecated ``PersistentId`` type. + +Change your implementations from:: + + def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit] = // ... + + def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit] = { + val p = messageIds.head.processorId // old + // ... + } + +to:: + + def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit] = // ... + + def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Future[Unit] = { + val p = messageIds.head.persistenceId // new + // ... + } + +Plugins written for ``2.3.3`` are source level compatible with ``2.3.4``, using the deprecated types, but will not work with future releases. +Plugin maintainers are asked to update their plugins to ``2.3.4`` as soon as possible. Removed Processor in favour of extending PersistentActor with persistAsync ========================================================================== diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 525471355d..42515eeb92 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -111,9 +111,9 @@ trait PersistenceDocSpec { new AnyRef { trait ProcessorMethods { - //#processor-id - def processorId: String - //#processor-id + //#persistence-id + def persistenceId: String + //#persistence-id //#recovery-status def recoveryRunning: Boolean def recoveryFinished: Boolean @@ -123,9 +123,9 @@ trait PersistenceDocSpec { //#current-message } class MyProcessor1 extends Processor with ProcessorMethods { - //#processor-id-override - override def processorId = "my-stable-processor-id" - //#processor-id-override + //#persistence-id-override + override def persistenceId = "my-stable-persistence-id" + //#persistence-id-override def receive = { case _ => } @@ -411,7 +411,7 @@ trait PersistenceDocSpec { //#view class MyView extends View { - def processorId: String = "some-processor-id" + override def persistenceId: String = "some-persistence-id" def receive: Actor.Receive = { case Persistent(payload, sequenceNr) => // ... @@ -440,26 +440,26 @@ trait PersistenceDocSpec { val materializer = FlowMaterializer(MaterializerSettings()) - val flow: Flow[Persistent] = PersistentFlow.fromProcessor("some-processor-id") + val flow: Flow[Persistent] = PersistentFlow.fromPersistence("some-persistence-id") val producer: Producer[Persistent] = flow.toProducer(materializer) //#producer-creation //#producer-buffer-size - PersistentFlow.fromProcessor("some-processor-id", PersistentPublisherSettings(maxBufferSize = 200)) + PersistentFlow.fromPersistence("some-persistence-id", PersistentPublisherSettings(maxBufferSize = 200)) //#producer-buffer-size //#producer-examples // 1 producer and 2 consumers: val producer1: Producer[Persistent] = - PersistentFlow.fromProcessor("processor-1").toProducer(materializer) + PersistentFlow.fromPersistence("processor-1").toProducer(materializer) Flow(producer1).foreach(p => println(s"consumer-1: ${p.payload}")).consume(materializer) Flow(producer1).foreach(p => println(s"consumer-2: ${p.payload}")).consume(materializer) // 2 producers (merged) and 1 consumer: val producer2: Producer[Persistent] = - PersistentFlow.fromProcessor("processor-2").toProducer(materializer) + PersistentFlow.fromPersistence("processor-2").toProducer(materializer) val producer3: Producer[Persistent] = - PersistentFlow.fromProcessor("processor-3").toProducer(materializer) + PersistentFlow.fromPersistence("processor-3").toProducer(materializer) Flow(producer2).merge(producer3).foreach { p => println(s"consumer-3: ${p.payload}") }.consume(materializer) diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index 7419f5e8af..7a32f42010 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -126,16 +126,16 @@ trait SharedLeveldbPluginDocSpec { class MyJournal extends AsyncWriteJournal { def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = ??? def asyncWriteConfirmations(confirmations: Seq[PersistentConfirmation]): Future[Unit] = ??? - def asyncDeleteMessages(messageIds: Seq[PersistentId], permanent: Boolean): Future[Unit] = ??? - def asyncDeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ??? - def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = ??? - def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] = ??? + def asyncDeleteMessages(messageIds: Seq[PersistenceId], permanent: Boolean): Future[Unit] = ??? + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ??? + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) => Unit): Future[Unit] = ??? + def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = ??? } class MySnapshotStore extends SnapshotStore { - def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ??? + def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = ??? def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = ??? def saved(metadata: SnapshotMetadata): Unit = ??? def delete(metadata: SnapshotMetadata): Unit = ??? - def delete(processorId: String, criteria: SnapshotSelectionCriteria): Unit = ??? + def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Unit = ??? } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 2fa7805d86..f3323dbb43 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -165,30 +165,30 @@ Identifiers ----------- A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the -``String`` representation of processor's path without the address part and can be obtained via the ``processorId`` +``String`` representation of processor's path without the address part and can be obtained via the ``persistenceId`` method. -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#processor-id +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistence-id Applications can customize a processor's id by specifying an actor name during processor creation as shown in section :ref:`processors`. This changes that processor's name in its actor hierarchy and hence influences only -part of the processor id. To fully customize a processor's id, the ``processorId`` method must be overridden. +part of the processor id. To fully customize a processor's id, the ``persistenceId`` method must be overridden. -.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#processor-id-override +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistence-id-override -Overriding ``processorId`` is the recommended way to generate stable identifiers. +Overriding ``persistenceId`` is the recommended way to generate stable identifiers. .. _views: Views ===== -Views can be implemented by extending the ``View`` trait and implementing the ``receive`` and the ``processorId`` +Views can be implemented by extending the ``View`` trait and implementing the ``receive`` and the ``persistenceId`` methods. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#view -The ``processorId`` identifies the processor from which the view receives journaled messages. It is not necessary +The ``persistenceId`` identifies the processor from which the view receives journaled messages. It is not necessary the referenced processor is actually running. Views read messages from a processor's journal directly. When a processor is started later and begins to write new messages, the corresponding view is updated automatically, by default. @@ -238,7 +238,7 @@ Applications can customize a view's id by specifying an actor name during view c name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the ``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. -The ``viewId`` must differ from the referenced ``processorId``, unless :ref:`snapshots` of a view and its +The ``viewId`` must differ from the referenced ``persistenceId``, unless :ref:`snapshots` of a view and its processor shall be shared (which is what applications usually do not want). .. _channels: @@ -378,7 +378,7 @@ creating the channel with the ``replyPersistent`` configuration parameter set to With this setting, either the successfully persisted message is replied to the sender or a ``PersistenceFailure`` message. In case the latter case, the sender should re-send the message. -.. _processor-identifiers: +.. _persistence-identifiers: Identifiers ----------- diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java index 36cfad6733..d193f64c27 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java @@ -25,23 +25,23 @@ interface AsyncRecoveryPlugin { * The channel ids of delivery confirmations that are available for a replayed * message must be contained in that message's `confirms` sequence. * - * @param processorId processor id. + * @param persistenceId processor id. * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param max maximum number of messages to be replayed. * @param replayCallback called to replay a single message. Can be called from any * thread. */ - Future doAsyncReplayMessages(String processorId, long fromSequenceNr, long toSequenceNr, long max, Procedure replayCallback); + Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, Procedure replayCallback); /** * Java API, Plugin API: asynchronously reads the highest stored sequence number - * for the given `processorId`. + * for the given `persistenceId`. * - * @param processorId processor id. + * @param persistenceId processor id. * @param fromSequenceNr hint where to start searching for the highest sequence * number. */ - Future doAsyncReadHighestSequenceNr(String processorId, long fromSequenceNr); + Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr); //#async-replay-plugin-api } diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java index 838e957de6..638656ccb1 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java @@ -28,7 +28,7 @@ interface AsyncWritePlugin { * from the journal. If `permanent` is set to `false`, the persistent messages are * marked as deleted, otherwise they are permanently deleted. */ - Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent); + Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent); /** * Java API, Plugin API: synchronously deletes all persistent messages up to @@ -37,6 +37,6 @@ interface AsyncWritePlugin { * * @see AsyncRecoveryPlugin */ - Future doAsyncDeleteMessagesTo(String processorId, long toSequenceNr, boolean permanent); + Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent); //#async-write-plugin-api } diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java index 39f52460ab..86600827df 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java @@ -26,7 +26,7 @@ interface SyncWritePlugin { * from the journal. If `permanent` is set to `false`, the persistent messages are * marked as deleted, otherwise they are permanently deleted. */ - void doDeleteMessages(Iterable messageIds, boolean permanent); + void doDeleteMessages(Iterable messageIds, boolean permanent); /** * Java API, Plugin API: synchronously deletes all persistent messages up to @@ -35,6 +35,6 @@ interface SyncWritePlugin { * * @see AsyncRecoveryPlugin */ - void doDeleteMessagesTo(String processorId, long toSequenceNr, boolean permanent); + void doDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent); //#sync-write-plugin-api } diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index 340c98e995..1d5169b9d6 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -721,20 +721,20 @@ public final class MessageFormats { */ long getSequenceNr(); - // optional string processorId = 3; + // optional string persistenceId = 3; /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - boolean hasProcessorId(); + boolean hasPersistenceId(); /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - java.lang.String getProcessorId(); + java.lang.String getPersistenceId(); /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ com.google.protobuf.ByteString - getProcessorIdBytes(); + getPersistenceIdBytes(); // optional bool deleted = 4; /** @@ -901,7 +901,7 @@ public final class MessageFormats { } case 26: { bitField0_ |= 0x00000004; - processorId_ = input.readBytes(); + persistenceId_ = input.readBytes(); break; } case 32: { @@ -1031,20 +1031,20 @@ public final class MessageFormats { return sequenceNr_; } - // optional string processorId = 3; - public static final int PROCESSORID_FIELD_NUMBER = 3; - private java.lang.Object processorId_; + // optional string persistenceId = 3; + public static final int PersistenceId_FIELD_NUMBER = 3; + private java.lang.Object persistenceId_; /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public boolean hasProcessorId() { + public boolean hasPersistenceId() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public java.lang.String getProcessorId() { - java.lang.Object ref = processorId_; + public java.lang.String getPersistenceId() { + java.lang.Object ref = persistenceId_; if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { @@ -1052,22 +1052,22 @@ public final class MessageFormats { (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { - processorId_ = s; + persistenceId_ = s; } return s; } } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ public com.google.protobuf.ByteString - getProcessorIdBytes() { - java.lang.Object ref = processorId_; + getPersistenceIdBytes() { + java.lang.Object ref = persistenceId_; if (ref instanceof java.lang.String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - processorId_ = b; + persistenceId_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; @@ -1263,7 +1263,7 @@ public final class MessageFormats { private void initFields() { payload_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); sequenceNr_ = 0L; - processorId_ = ""; + persistenceId_ = ""; deleted_ = false; redeliveries_ = 0; confirms_ = com.google.protobuf.LazyStringArrayList.EMPTY; @@ -1297,7 +1297,7 @@ public final class MessageFormats { output.writeInt64(2, sequenceNr_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { - output.writeBytes(3, getProcessorIdBytes()); + output.writeBytes(3, getPersistenceIdBytes()); } if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBool(4, deleted_); @@ -1339,7 +1339,7 @@ public final class MessageFormats { } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(3, getProcessorIdBytes()); + .computeBytesSize(3, getPersistenceIdBytes()); } if (((bitField0_ & 0x00000008) == 0x00000008)) { size += com.google.protobuf.CodedOutputStream @@ -1500,7 +1500,7 @@ public final class MessageFormats { bitField0_ = (bitField0_ & ~0x00000001); sequenceNr_ = 0L; bitField0_ = (bitField0_ & ~0x00000002); - processorId_ = ""; + persistenceId_ = ""; bitField0_ = (bitField0_ & ~0x00000004); deleted_ = false; bitField0_ = (bitField0_ & ~0x00000008); @@ -1563,7 +1563,7 @@ public final class MessageFormats { if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - result.processorId_ = processorId_; + result.persistenceId_ = persistenceId_; if (((from_bitField0_ & 0x00000008) == 0x00000008)) { to_bitField0_ |= 0x00000008; } @@ -1620,9 +1620,9 @@ public final class MessageFormats { if (other.hasSequenceNr()) { setSequenceNr(other.getSequenceNr()); } - if (other.hasProcessorId()) { + if (other.hasPersistenceId()) { bitField0_ |= 0x00000004; - processorId_ = other.processorId_; + persistenceId_ = other.persistenceId_; onChanged(); } if (other.hasDeleted()) { @@ -1840,76 +1840,76 @@ public final class MessageFormats { return this; } - // optional string processorId = 3; - private java.lang.Object processorId_ = ""; + // optional string persistenceId = 3; + private java.lang.Object persistenceId_ = ""; /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public boolean hasProcessorId() { + public boolean hasPersistenceId() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public java.lang.String getProcessorId() { - java.lang.Object ref = processorId_; + public java.lang.String getPersistenceId() { + java.lang.Object ref = persistenceId_; if (!(ref instanceof java.lang.String)) { java.lang.String s = ((com.google.protobuf.ByteString) ref) .toStringUtf8(); - processorId_ = s; + persistenceId_ = s; return s; } else { return (java.lang.String) ref; } } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ public com.google.protobuf.ByteString - getProcessorIdBytes() { - java.lang.Object ref = processorId_; + getPersistenceIdBytes() { + java.lang.Object ref = persistenceId_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - processorId_ = b; + persistenceId_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public Builder setProcessorId( + public Builder setPersistenceId( java.lang.String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; - processorId_ = value; + persistenceId_ = value; onChanged(); return this; } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public Builder clearProcessorId() { + public Builder clearPersistenceId() { bitField0_ = (bitField0_ & ~0x00000004); - processorId_ = getDefaultInstance().getProcessorId(); + persistenceId_ = getDefaultInstance().getPersistenceId(); onChanged(); return this; } /** - * optional string processorId = 3; + * optional string persistenceId = 3; */ - public Builder setProcessorIdBytes( + public Builder setPersistenceIdBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; - processorId_ = value; + persistenceId_ = value; onChanged(); return this; } @@ -2965,20 +2965,20 @@ public final class MessageFormats { public interface DeliveredMessageOrBuilder extends com.google.protobuf.MessageOrBuilder { - // optional string processorId = 1; + // optional string persistenceId = 1; /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - boolean hasProcessorId(); + boolean hasPersistenceId(); /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - java.lang.String getProcessorId(); + java.lang.String getPersistenceId(); /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ com.google.protobuf.ByteString - getProcessorIdBytes(); + getPersistenceIdBytes(); // optional string channelId = 2; /** @@ -3083,7 +3083,7 @@ public final class MessageFormats { } case 10: { bitField0_ |= 0x00000001; - processorId_ = input.readBytes(); + persistenceId_ = input.readBytes(); break; } case 18: { @@ -3146,20 +3146,20 @@ public final class MessageFormats { } private int bitField0_; - // optional string processorId = 1; - public static final int PROCESSORID_FIELD_NUMBER = 1; - private java.lang.Object processorId_; + // optional string persistenceId = 1; + public static final int PersistenceId_FIELD_NUMBER = 1; + private java.lang.Object persistenceId_; /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public boolean hasProcessorId() { + public boolean hasPersistenceId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public java.lang.String getProcessorId() { - java.lang.Object ref = processorId_; + public java.lang.String getPersistenceId() { + java.lang.Object ref = persistenceId_; if (ref instanceof java.lang.String) { return (java.lang.String) ref; } else { @@ -3167,22 +3167,22 @@ public final class MessageFormats { (com.google.protobuf.ByteString) ref; java.lang.String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { - processorId_ = s; + persistenceId_ = s; } return s; } } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ public com.google.protobuf.ByteString - getProcessorIdBytes() { - java.lang.Object ref = processorId_; + getPersistenceIdBytes() { + java.lang.Object ref = persistenceId_; if (ref instanceof java.lang.String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - processorId_ = b; + persistenceId_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; @@ -3308,7 +3308,7 @@ public final class MessageFormats { } private void initFields() { - processorId_ = ""; + persistenceId_ = ""; channelId_ = ""; persistentSequenceNr_ = 0L; deliverySequenceNr_ = 0L; @@ -3327,7 +3327,7 @@ public final class MessageFormats { throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeBytes(1, getProcessorIdBytes()); + output.writeBytes(1, getPersistenceIdBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getChannelIdBytes()); @@ -3352,7 +3352,7 @@ public final class MessageFormats { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(1, getProcessorIdBytes()); + .computeBytesSize(1, getPersistenceIdBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream @@ -3486,7 +3486,7 @@ public final class MessageFormats { public Builder clear() { super.clear(); - processorId_ = ""; + persistenceId_ = ""; bitField0_ = (bitField0_ & ~0x00000001); channelId_ = ""; bitField0_ = (bitField0_ & ~0x00000002); @@ -3527,7 +3527,7 @@ public final class MessageFormats { if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } - result.processorId_ = processorId_; + result.persistenceId_ = persistenceId_; if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } @@ -3560,9 +3560,9 @@ public final class MessageFormats { public Builder mergeFrom(akka.persistence.serialization.MessageFormats.DeliveredMessage other) { if (other == akka.persistence.serialization.MessageFormats.DeliveredMessage.getDefaultInstance()) return this; - if (other.hasProcessorId()) { + if (other.hasPersistenceId()) { bitField0_ |= 0x00000001; - processorId_ = other.processorId_; + persistenceId_ = other.persistenceId_; onChanged(); } if (other.hasChannelId()) { @@ -3608,76 +3608,76 @@ public final class MessageFormats { } private int bitField0_; - // optional string processorId = 1; - private java.lang.Object processorId_ = ""; + // optional string persistenceId = 1; + private java.lang.Object persistenceId_ = ""; /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public boolean hasProcessorId() { + public boolean hasPersistenceId() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public java.lang.String getProcessorId() { - java.lang.Object ref = processorId_; + public java.lang.String getPersistenceId() { + java.lang.Object ref = persistenceId_; if (!(ref instanceof java.lang.String)) { java.lang.String s = ((com.google.protobuf.ByteString) ref) .toStringUtf8(); - processorId_ = s; + persistenceId_ = s; return s; } else { return (java.lang.String) ref; } } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ public com.google.protobuf.ByteString - getProcessorIdBytes() { - java.lang.Object ref = processorId_; + getPersistenceIdBytes() { + java.lang.Object ref = persistenceId_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (java.lang.String) ref); - processorId_ = b; + persistenceId_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public Builder setProcessorId( + public Builder setPersistenceId( java.lang.String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000001; - processorId_ = value; + persistenceId_ = value; onChanged(); return this; } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public Builder clearProcessorId() { + public Builder clearPersistenceId() { bitField0_ = (bitField0_ & ~0x00000001); - processorId_ = getDefaultInstance().getProcessorId(); + persistenceId_ = getDefaultInstance().getPersistenceId(); onChanged(); return this; } /** - * optional string processorId = 1; + * optional string persistenceId = 1; */ - public Builder setProcessorIdBytes( + public Builder setPersistenceIdBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000001; - processorId_ = value; + persistenceId_ = value; onChanged(); return this; } @@ -4620,14 +4620,14 @@ public final class MessageFormats { "ageBatch\022!\n\005batch\030\001 \003(\0132\022.PersistentMess" + "age\"\201\002\n\021PersistentMessage\022#\n\007payload\030\001 \001" + "(\0132\022.PersistentPayload\022\022\n\nsequenceNr\030\002 \001" + - "(\003\022\023\n\013processorId\030\003 \001(\t\022\017\n\007deleted\030\004 \001(\010" + + "(\003\022\023\n\013persistenceId\030\003 \001(\t\022\017\n\007deleted\030\004 \001(\010" + "\022\024\n\014redeliveries\030\006 \001(\005\022\020\n\010confirms\030\007 \003(\t" + "\022\023\n\013confirmable\030\010 \001(\010\022)\n\016confirmMessage\030" + "\t \001(\0132\021.DeliveredMessage\022\025\n\rconfirmTarge" + "t\030\n \001(\t\022\016\n\006sender\030\013 \001(\t\"S\n\021PersistentPay" + "load\022\024\n\014serializerId\030\001 \002(\005\022\017\n\007payload\030\002 ", "\002(\014\022\027\n\017payloadManifest\030\003 \001(\014\"\205\001\n\020Deliver" + - "edMessage\022\023\n\013processorId\030\001 \001(\t\022\021\n\tchanne" + + "edMessage\022\023\n\013persistenceId\030\001 \001(\t\022\021\n\tchanne" + "lId\030\002 \001(\t\022\034\n\024persistentSequenceNr\030\003 \001(\003\022" + "\032\n\022deliverySequenceNr\030\004 \001(\003\022\017\n\007channel\030\005" + " \001(\t\"M\n\016DeliverMessage\022&\n\npersistent\030\001 \001" + @@ -4650,7 +4650,7 @@ public final class MessageFormats { internal_static_PersistentMessage_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PersistentMessage_descriptor, - new java.lang.String[] { "Payload", "SequenceNr", "ProcessorId", "Deleted", "Redeliveries", "Confirms", "Confirmable", "ConfirmMessage", "ConfirmTarget", "Sender", }); + new java.lang.String[] { "Payload", "SequenceNr", "PersistenceId", "Deleted", "Redeliveries", "Confirms", "Confirmable", "ConfirmMessage", "ConfirmTarget", "Sender", }); internal_static_PersistentPayload_descriptor = getDescriptor().getMessageTypes().get(2); internal_static_PersistentPayload_fieldAccessorTable = new @@ -4662,7 +4662,7 @@ public final class MessageFormats { internal_static_DeliveredMessage_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_DeliveredMessage_descriptor, - new java.lang.String[] { "ProcessorId", "ChannelId", "PersistentSequenceNr", "DeliverySequenceNr", "Channel", }); + new java.lang.String[] { "PersistenceId", "ChannelId", "PersistentSequenceNr", "DeliverySequenceNr", "Channel", }); internal_static_DeliverMessage_descriptor = getDescriptor().getMessageTypes().get(4); internal_static_DeliverMessage_fieldAccessorTable = new diff --git a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java index c96292e53f..3b506ce87f 100644 --- a/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java @@ -14,10 +14,10 @@ interface SnapshotStorePlugin { /** * Java API, Plugin API: asynchronously loads a snapshot. * - * @param processorId processor id. + * @param persistenceId processor id. * @param criteria selection criteria for loading. */ - Future> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria); + Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria); /** * Java API, Plugin API: asynchronously saves a snapshot. @@ -44,9 +44,9 @@ interface SnapshotStorePlugin { /** * Java API, Plugin API: deletes all snapshots matching `criteria`. * - * @param processorId processor id. + * @param persistenceId processor id. * @param criteria selection criteria for deleting. */ - void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception; + void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception; //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index c96747ddcc..3cc5db7276 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -12,7 +12,7 @@ message PersistentMessageBatch { message PersistentMessage { optional PersistentPayload payload = 1; optional int64 sequenceNr = 2; - optional string processorId = 3; + optional string persistenceId = 3; optional bool deleted = 4; optional int32 redeliveries = 6; repeated string confirms = 7; @@ -29,7 +29,7 @@ message PersistentPayload { } message DeliveredMessage { - optional string processorId = 1; + optional string persistenceId = 1; optional string channelId = 2; optional int64 persistentSequenceNr = 3; optional int64 deliverySequenceNr = 4; diff --git a/akka-persistence/src/main/scala/akka/persistence/Channel.scala b/akka-persistence/src/main/scala/akka/persistence/Channel.scala index 95c6c06cbe..6c85941e85 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Channel.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Channel.scala @@ -139,7 +139,7 @@ final class Channel private[akka] (_channelId: Option[String], channelSettings: private def prepareDelivery(persistent: PersistentRepr): PersistentRepr = ConfirmablePersistentImpl(persistent, confirmTarget = journal, - confirmMessage = DeliveredByChannel(persistent.processorId, id, persistent.sequenceNr, channel = self)) + confirmMessage = DeliveredByChannel(persistent.persistenceId, id, persistent.sequenceNr, channel = self)) } object Channel { @@ -216,7 +216,7 @@ trait Delivered extends Message { * Plugin API. */ final case class DeliveredByChannel( - processorId: String, + @deprecatedName('processorId) persistenceId: String, channelId: String, persistentSequenceNr: Long, deliverySequenceNr: Long = 0L, diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index af60f07348..9753d957ae 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -18,12 +18,12 @@ private[persistence] object JournalProtocol { * Request to delete messages identified by `messageIds`. If `permanent` is set to `false`, * the persistent messages are marked as deleted, otherwise they are permanently deleted. */ - final case class DeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean, requestor: Option[ActorRef] = None) + final case class DeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean, requestor: Option[ActorRef] = None) /** * Reply message to a successful [[DeleteMessages]] request. */ - final case class DeleteMessagesSuccess(messageIds: immutable.Seq[PersistentId]) + final case class DeleteMessagesSuccess(messageIds: immutable.Seq[PersistenceId]) /** * Reply message to a failed [[DeleteMessages]] request. @@ -35,7 +35,7 @@ private[persistence] object JournalProtocol { * (inclusive). If `permanent` is set to `false`, the persistent messages are marked * as deleted in the journal, otherwise they are permanently deleted from the journal. */ - final case class DeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean) + final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) /** * Request to write delivery confirmations. @@ -113,11 +113,11 @@ private[persistence] object JournalProtocol { * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param max maximum number of messages to be replayed. - * @param processorId requesting processor id. + * @param persistenceId requesting processor id. * @param processor requesting processor. * @param replayDeleted `true` if messages marked as deleted shall be replayed. */ - final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, processorId: String, processor: ActorRef, replayDeleted: Boolean = false) + final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, persistenceId: String, processor: ActorRef, replayDeleted: Boolean = false) /** * Reply message to a [[ReplayMessages]] request. A separate reply is sent to the requestor for each @@ -143,10 +143,10 @@ private[persistence] object JournalProtocol { * Request to read the highest stored sequence number of a given processor. * * @param fromSequenceNr optional hint where to start searching for the maximum sequence number. - * @param processorId requesting processor id. + * @param persistenceId requesting processor id. * @param processor requesting processor. */ - final case class ReadHighestSequenceNr(fromSequenceNr: Long = 1L, processorId: String, processor: ActorRef) + final case class ReadHighestSequenceNr(fromSequenceNr: Long = 1L, persistenceId: String, processor: ActorRef) /** * Reply message to a successful [[ReadHighestSequenceNr]] request. diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index db9744e473..048e642d86 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -102,26 +102,33 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { /** * Creates a canonical processor id from a processor actor ref. */ + @deprecated("Use `persistenceId` instead. Processor will be removed.", since = "2.3.4") def processorId(processor: ActorRef): String = id(processor) + /** + * Creates a canonical persistent actor id from a processor actor ref. + */ + def persistenceId(persistentActor: ActorRef): String = id(persistentActor) + /** * Creates a canonical channel id from a channel actor ref. */ + @deprecated("Channels will be removed. You may want to use `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") def channelId(channel: ActorRef): String = id(channel) /** - * Returns a snapshot store for a processor identified by `processorId`. + * Returns a snapshot store for a processor identified by `persistenceId`. */ - def snapshotStoreFor(processorId: String): ActorRef = { + def snapshotStoreFor(persistenceId: String): ActorRef = { // Currently returns a snapshot store singleton but this methods allows for later // optimizations where each processor can have its own snapshot store actor. snapshotStore } /** - * Returns a journal for a processor identified by `processorId`. + * Returns a journal for a processor identified by `persistenceId`. */ - def journalFor(processorId: String): ActorRef = { + def journalFor(persistenceId: String): ActorRef = { // Currently returns a journal singleton but this methods allows for later // optimizations where each processor can have its own journal actor. journal diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 2c957c4156..bf08fbb308 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -128,19 +128,35 @@ case class PersistentBatch(batch: immutable.Seq[Resequenceable]) extends Message * Plugin API: confirmation entry written by journal plugins. */ trait PersistentConfirmation { - def processorId: String + @deprecated("Use `persistenceId` instead. Processor will be removed.", since = "2.3.4") + final def processorId: String = persistenceId + def persistenceId: String def channelId: String def sequenceNr: Long } /** * Plugin API: persistent message identifier. + * + * Deprecated, please use [[PersistenceId]]. */ -trait PersistentId { +@deprecated("Use PersistenceId instead.", since = "2.3.4") +trait PersistentId extends PersistenceId { /** - * Id of processor that journals a persistent message + * Persistent id that journals a persistent message */ - def processorId: String + @deprecated("Use `persistenceId` instead.", since = "2.3.4") + def processorId: String = persistenceId +} + +/** + * Plugin API: persistent message identifier. + */ +trait PersistenceId { + /** + * Persistent id that journals a persistent message + */ + def persistenceId: String /** * A persistent message's sequence number. @@ -151,7 +167,7 @@ trait PersistentId { /** * INTERNAL API. */ -private[persistence] final case class PersistentIdImpl(processorId: String, sequenceNr: Long) extends PersistentId +private[persistence] final case class PersistenceIdImpl(persistenceId: String, sequenceNr: Long) extends PersistenceId /** * Plugin API: representation of a persistent message in the journal plugin API. @@ -160,7 +176,7 @@ private[persistence] final case class PersistentIdImpl(processorId: String, sequ * @see [[journal.AsyncWriteJournal]] * @see [[journal.AsyncRecovery]] */ -trait PersistentRepr extends Persistent with Resequenceable with PersistentId with Message { +trait PersistentRepr extends Persistent with Resequenceable with PersistenceId with Message { // todo we want to get rid of the Persistent() wrapper from user land; PersistentRepr is here to stay. #15230 import scala.collection.JavaConverters._ @@ -229,7 +245,7 @@ trait PersistentRepr extends Persistent with Resequenceable with PersistentId wi */ def update( sequenceNr: Long = sequenceNr, - processorId: String = processorId, + @deprecatedName('processorId) persistenceId: String = persistenceId, deleted: Boolean = deleted, redeliveries: Int = redeliveries, confirms: immutable.Seq[String] = confirms, @@ -250,7 +266,7 @@ object PersistentRepr { def apply( payload: Any, sequenceNr: Long = 0L, - processorId: String = PersistentRepr.Undefined, + @deprecatedName('processorId) persistenceId: String = PersistentRepr.Undefined, deleted: Boolean = false, redeliveries: Int = 0, confirms: immutable.Seq[String] = Nil, @@ -258,8 +274,8 @@ object PersistentRepr { confirmMessage: Delivered = null, confirmTarget: ActorRef = null, sender: ActorRef = null) = - if (confirmable) ConfirmablePersistentImpl(payload, sequenceNr, processorId, deleted, redeliveries, confirms, confirmMessage, confirmTarget, sender) - else PersistentImpl(payload, sequenceNr, processorId, deleted, confirms, sender) + if (confirmable) ConfirmablePersistentImpl(payload, sequenceNr, persistenceId, deleted, redeliveries, confirms, confirmMessage, confirmTarget, sender) + else PersistentImpl(payload, sequenceNr, persistenceId, deleted, confirms, sender) /** * Java API, Plugin API. @@ -281,7 +297,7 @@ object PersistentBatch { private[persistence] final case class PersistentImpl( payload: Any, sequenceNr: Long, - processorId: String, + @deprecatedName('processorId) persistenceId: String, deleted: Boolean, confirms: immutable.Seq[String], sender: ActorRef) extends Persistent with PersistentRepr { @@ -294,14 +310,14 @@ private[persistence] final case class PersistentImpl( def update( sequenceNr: Long, - processorId: String, + @deprecatedName('processorId) persistenceId: String, deleted: Boolean, redeliveries: Int, confirms: immutable.Seq[String], confirmMessage: Delivered, confirmTarget: ActorRef, sender: ActorRef) = - copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, confirms = confirms, sender = sender) + copy(sequenceNr = sequenceNr, persistenceId = persistenceId, deleted = deleted, confirms = confirms, sender = sender) val redeliveries: Int = 0 val confirmable: Boolean = false @@ -315,7 +331,7 @@ private[persistence] final case class PersistentImpl( private[persistence] final case class ConfirmablePersistentImpl( payload: Any, sequenceNr: Long, - processorId: String, + @deprecatedName('processorId) persistenceId: String, deleted: Boolean, redeliveries: Int, confirms: immutable.Seq[String], @@ -334,8 +350,8 @@ private[persistence] final case class ConfirmablePersistentImpl( def prepareWrite(sender: ActorRef) = copy(sender = sender, confirmMessage = null, confirmTarget = null) - def update(sequenceNr: Long, processorId: String, deleted: Boolean, redeliveries: Int, confirms: immutable.Seq[String], confirmMessage: Delivered, confirmTarget: ActorRef, sender: ActorRef) = - copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, redeliveries = redeliveries, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender) + def update(sequenceNr: Long, @deprecatedName('processorId) persistenceId: String, deleted: Boolean, redeliveries: Int, confirms: immutable.Seq[String], confirmMessage: Delivered, confirmTarget: ActorRef, sender: ActorRef) = + copy(sequenceNr = sequenceNr, persistenceId = persistenceId, deleted = deleted, redeliveries = redeliveries, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender) } /** @@ -343,5 +359,5 @@ private[persistence] final case class ConfirmablePersistentImpl( */ private[persistence] object ConfirmablePersistentImpl { def apply(persistent: PersistentRepr, confirmMessage: Delivered, confirmTarget: ActorRef = null): ConfirmablePersistentImpl = - ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.processorId, persistent.deleted, persistent.redeliveries, persistent.confirms, confirmMessage, confirmTarget, persistent.sender) + ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.persistenceId, persistent.deleted, persistent.redeliveries, persistent.confirms, confirmMessage, confirmTarget, persistent.sender) } diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala index 06c3be1c9f..5132253f58 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala @@ -183,15 +183,16 @@ object PersistentChannel { /** * Plugin API. */ -final case class DeliveredByPersistentChannel( +@deprecated("PersistentChannel will be removed, see `AtLeastOnceDelivery` instead.", since = "2.3.4") +final case class DeliveredByPersistenceChannel( channelId: String, persistentSequenceNr: Long, deliverySequenceNr: Long = 0L, - channel: ActorRef = null) extends Delivered with PersistentId { + channel: ActorRef = null) extends Delivered with PersistenceId { - def processorId: String = channelId + def persistenceId: String = channelId def sequenceNr: Long = persistentSequenceNr - def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByPersistentChannel = + def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByPersistenceChannel = copy(deliverySequenceNr = deliverySequenceNr, channel = channel) } @@ -203,25 +204,25 @@ private[persistence] class DeliveredByPersistentChannelBatching(journal: ActorRe private val batchMax = settings.journal.maxConfirmationBatchSize private var batching = false - private var batch = Vector.empty[DeliveredByPersistentChannel] + private var batch = Vector.empty[DeliveredByPersistenceChannel] def receive = { case DeleteMessagesSuccess(messageIds) ⇒ if (batch.isEmpty) batching = false else journalBatch() messageIds.foreach { - case c: DeliveredByPersistentChannel ⇒ + case c: DeliveredByPersistenceChannel ⇒ c.channel ! c if (publish) context.system.eventStream.publish(c) } case DeleteMessagesFailure(_) ⇒ if (batch.isEmpty) batching = false else journalBatch() - case d: DeliveredByPersistentChannel ⇒ + case d: DeliveredByPersistenceChannel ⇒ addToBatch(d) if (!batching || maxBatchSizeReached) journalBatch() case m ⇒ journal forward m } - def addToBatch(pc: DeliveredByPersistentChannel): Unit = + def addToBatch(pc: DeliveredByPersistenceChannel): Unit = batch = batch :+ pc def maxBatchSizeReached: Boolean = @@ -243,16 +244,16 @@ private class RequestWriter(channelId: String, channelSettings: PersistentChanne private val cbJournal = extension.confirmationBatchingJournalForChannel(channelId) - override val processorId = channelId + override val persistenceId = channelId def receive = { case p @ Persistent(Deliver(wrapped: PersistentRepr, _), _) ⇒ - if (!recoveryRunning && wrapped.processorId != PersistentRepr.Undefined) { + if (!recoveryRunning && wrapped.persistenceId != PersistentRepr.Undefined) { // Write a delivery confirmation to the journal so that replayed Deliver // requests from a sending processor are not persisted again. Replaying // Deliver requests is now the responsibility of this processor // and confirmation by destination is done to the wrapper p.sequenceNr. - cbJournal ! DeliveredByChannel(wrapped.processorId, channelId, wrapped.sequenceNr) + cbJournal ! DeliveredByChannel(wrapped.persistenceId, channelId, wrapped.sequenceNr) } if (!recoveryRunning && replyPersistent) @@ -341,8 +342,7 @@ private class RequestReader(channelId: String, channelSettings: PersistentChanne onReplayComplete() } - def processorId: String = - channelId + override def persistenceId: String = channelId def snapshotterId: String = s"${channelId}-reader" @@ -368,7 +368,7 @@ private class RequestReader(channelId: String, channelSettings: PersistentChanne private def onReadRequest(): Unit = if (_currentState == idle) { _currentState = replayStarted(await = false) - dbJournal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, pendingConfirmationsMax - numPending, processorId, self) + dbJournal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, pendingConfirmationsMax - numPending, persistenceId, self) } /** @@ -378,7 +378,7 @@ private class RequestReader(channelId: String, channelSettings: PersistentChanne private def prepareDelivery(wrapped: PersistentRepr, wrapper: PersistentRepr): PersistentRepr = { ConfirmablePersistentImpl(wrapped, confirmTarget = dbJournal, - confirmMessage = DeliveredByPersistentChannel(channelId, wrapper.sequenceNr, channel = self)) + confirmMessage = DeliveredByPersistenceChannel(channelId, wrapper.sequenceNr, channel = self)) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index dd1e3f980b..7bc180e79b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -111,7 +111,7 @@ trait Processor extends Actor with Recovery { def addToBatch(p: Resequenceable): Unit = p match { case p: PersistentRepr ⇒ - processorBatch = processorBatch :+ p.update(processorId = processorId, sequenceNr = nextSequenceNr(), sender = sender()) + processorBatch = processorBatch :+ p.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), sender = sender()) case r ⇒ processorBatch = processorBatch :+ r } @@ -136,7 +136,7 @@ trait Processor extends Actor with Recovery { */ private[persistence] def onReplaySuccess(receive: Receive, awaitReplay: Boolean): Unit = { _currentState = initializing - journal ! ReadHighestSequenceNr(lastSequenceNr, processorId, self) + journal ! ReadHighestSequenceNr(lastSequenceNr, persistenceId, self) } /** @@ -157,7 +157,7 @@ trait Processor extends Actor with Recovery { private def onRecoveryCompleted(receive: Receive): Unit = receive.applyOrElse(RecoveryCompleted, unhandled) - private val _processorId = extension.processorId(self) + private val _persistenceId = extension.persistenceId(self) private var processorBatch = Vector.empty[Resequenceable] private var sequenceNr: Long = 0L @@ -165,12 +165,18 @@ trait Processor extends Actor with Recovery { /** * Processor id. Defaults to this processor's path and can be overridden. */ - def processorId: String = _processorId + @deprecated("Override `persistenceId: String` instead. Processor will be removed.", since = "2.3.4") + override def processorId: String = _persistenceId // TODO: remove processorId /** - * Returns `processorId`. + * Persistence id. Defaults to this persistent-actors's path and can be overridden. */ - def snapshotterId: String = processorId + override def persistenceId: String = processorId + + /** + * Returns `persistenceId`. + */ + def snapshotterId: String = persistenceId /** * Returns `true` if this processor is currently recovering. @@ -193,7 +199,7 @@ trait Processor extends Actor with Recovery { * @param sequenceNr sequence number of the persistent message to be deleted. */ def deleteMessage(sequenceNr: Long): Unit = { - deleteMessage(sequenceNr, false) + deleteMessage(sequenceNr, permanent = false) } /** @@ -208,7 +214,7 @@ trait Processor extends Actor with Recovery { * @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted. */ def deleteMessage(sequenceNr: Long, permanent: Boolean): Unit = { - journal ! DeleteMessages(List(PersistentIdImpl(processorId, sequenceNr)), permanent) + journal ! DeleteMessages(List(PersistenceIdImpl(persistenceId, sequenceNr)), permanent) } /** @@ -217,7 +223,7 @@ trait Processor extends Actor with Recovery { * @param toSequenceNr upper sequence number bound of persistent messages to be deleted. */ def deleteMessages(toSequenceNr: Long): Unit = { - deleteMessages(toSequenceNr, true) + deleteMessages(toSequenceNr, permanent = true) } /** @@ -229,7 +235,7 @@ trait Processor extends Actor with Recovery { * @param permanent if `false`, the message is marked as deleted, otherwise it is permanently deleted. */ def deleteMessages(toSequenceNr: Long, permanent: Boolean): Unit = { - journal ! DeleteMessagesTo(processorId, toSequenceNr, permanent) + journal ! DeleteMessagesTo(persistenceId, toSequenceNr, permanent) } /** @@ -296,13 +302,13 @@ trait Processor extends Actor with Recovery { message match { case RecoveryCompleted ⇒ // mute case RecoveryFailure(cause) ⇒ - val errorMsg = s"Processor killed after recovery failure (processor id = [${processorId}]). " + + val errorMsg = s"Processor killed after recovery failure (persisten id = [${persistenceId}]). " + "To avoid killing processors on recovery failure, a processor must handle RecoveryFailure messages. " + "RecoveryFailure was caused by: " + cause throw new ActorKilledException(errorMsg) case PersistenceFailure(payload, sequenceNumber, cause) ⇒ val errorMsg = "Processor killed after persistence failure " + - s"(processor id = [${processorId}], sequence nr = [${sequenceNumber}], payload class = [${payload.getClass.getName}]). " + + s"(persistent id = [${persistenceId}], sequence nr = [${sequenceNumber}], payload class = [${payload.getClass.getName}]). " + "To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages. " + "PersistenceFailure was caused by: " + cause throw new ActorKilledException(errorMsg) diff --git a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala index 138bd46ada..68152cbbdf 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala @@ -87,7 +87,7 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { process(receive, SnapshotOffer(metadata, snapshot)) } _currentState = replayStarted(await = true) - journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, processorId, self) + journal ! ReplayMessages(lastSequenceNr + 1L, toSnr, replayMax, persistenceId, self) case other ⇒ receiverStash.stash() } } @@ -171,7 +171,10 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { /** * Id of the processor for which messages should be replayed. */ - def processorId: String + @deprecated("Override `persistenceId` instead. Processor will be removed.", since = "2.3.4") + def processorId: String = extension.persistenceId(self) // TODO: remove processorId + + def persistenceId: String = processorId /** * Returns the current persistent message if there is any. @@ -229,7 +232,7 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { /** * INTERNAL API. */ - private[persistence] lazy val journal = extension.journalFor(processorId) + private[persistence] lazy val journal = extension.journalFor(persistenceId) /** * INTERNAL API. diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala index 851698dce5..0b5ac1a547 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala @@ -8,12 +8,12 @@ package akka.persistence /** * Snapshot metadata. * - * @param processorId id of processor from which the snapshot was taken. + * @param persistenceId id of processor from which the snapshot was taken. * @param sequenceNr sequence number at which the snapshot was taken. * @param timestamp time at which the snapshot was saved. */ @SerialVersionUID(1L) //#snapshot-metadata -final case class SnapshotMetadata(processorId: String, sequenceNr: Long, timestamp: Long = 0L) +final case class SnapshotMetadata(@deprecatedName('processorId) persistenceId: String, sequenceNr: Long, timestamp: Long = 0L) //#snapshot-metadata /** @@ -116,11 +116,11 @@ private[persistence] object SnapshotProtocol { /** * Instructs a snapshot store to load a snapshot. * - * @param processorId processor id. + * @param persistenceId processor id. * @param criteria criteria for selecting a snapshot from which recovery should start. * @param toSequenceNr upper sequence number bound (inclusive) for recovery. */ - final case class LoadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) + final case class LoadSnapshot(@deprecatedName('processorId) persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) /** * Response message to a [[LoadSnapshot]] message. @@ -147,8 +147,8 @@ private[persistence] object SnapshotProtocol { /** * Instructs snapshot store to delete all snapshots that match `criteria`. * - * @param processorId processor id. + * @param persistenceId processor id. * @param criteria criteria for selecting snapshots to be deleted. */ - final case class DeleteSnapshots(processorId: String, criteria: SnapshotSelectionCriteria) + final case class DeleteSnapshots(@deprecatedName('processorId) persistenceId: String, criteria: SnapshotSelectionCriteria) } diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala index 1118e64822..f03408322b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala @@ -23,8 +23,8 @@ trait Snapshotter extends Actor { */ def snapshotSequenceNr: Long - def loadSnapshot(processorId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) = - snapshotStore ! LoadSnapshot(processorId, criteria, toSequenceNr) + def loadSnapshot(persistenceId: String, criteria: SnapshotSelectionCriteria, toSequenceNr: Long) = + snapshotStore ! LoadSnapshot(persistenceId, criteria, toSequenceNr) /** * Saves a `snapshot` of this snapshotter's state. If saving succeeds, this snapshotter will receive a diff --git a/akka-persistence/src/main/scala/akka/persistence/View.scala b/akka-persistence/src/main/scala/akka/persistence/View.scala index 2a1db64bfa..26ed3a9208 100644 --- a/akka-persistence/src/main/scala/akka/persistence/View.scala +++ b/akka-persistence/src/main/scala/akka/persistence/View.scala @@ -50,7 +50,7 @@ case object Update { * message stream as [[Persistent]] messages. These messages can be processed to update internal state * in order to maintain an (eventual consistent) view of the state of the corresponding processor. A * view can also run on a different node, provided that a replicated journal is used. Implementation - * classes reference a processor by implementing `processorId`. + * classes reference a processor by implementing `persistenceId`. * * Views can also store snapshots of internal state by calling [[#saveSnapshot]]. The snapshots of a view * are independent of those of the referenced processor. During recovery, a saved snapshot is offered @@ -106,7 +106,7 @@ trait View extends Actor with Recovery { case r: Recover ⇒ // ignore case Update(awaitUpdate, replayMax) ⇒ _currentState = replayStarted(await = awaitUpdate) - journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, processorId, self) + journal ! ReplayMessages(lastSequenceNr + 1L, Long.MaxValue, replayMax, persistenceId, self) case other ⇒ process(receive, other) } } @@ -132,7 +132,7 @@ trait View extends Actor with Recovery { if (await) receiverStash.unstashAll() } - private val _viewId = extension.processorId(self) + private val _viewId = extension.persistenceId(self) private val viewSettings = extension.settings.view private var schedule: Option[Cancellable] = None diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala index c9c9a45f49..9724894df2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala @@ -27,7 +27,7 @@ trait AsyncRecovery { * The channel ids of delivery confirmations that are available for a replayed * message must be contained in that message's `confirms` sequence. * - * @param processorId processor id. + * @param persistenceId processor id. * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param max maximum number of messages to be replayed. @@ -37,16 +37,16 @@ trait AsyncRecovery { * @see [[AsyncWriteJournal]] * @see [[SyncWriteJournal]] */ - def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] /** * Plugin API: asynchronously reads the highest stored sequence number for the - * given `processorId`. + * given `persistenceId`. * - * @param processorId processor id. + * @param persistenceId processor id. * @param fromSequenceNr hint where to start searching for the highest sequence * number. */ - def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] + def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] //#journal-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 7676794a8b..a90d95d846 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -43,10 +43,10 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { resequence(WriteMessageFailure(_, e)) } resequencerCounter += resequenceables.length + 1 - case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, processor, replayDeleted) ⇒ // Send replayed messages and replay result to processor directly. No need // to resequence replayed messages relative to written and looped messages. - asyncReplayMessages(processorId, fromSequenceNr, toSequenceNr, max) { p ⇒ + asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒ if (!p.deleted || replayDeleted) processor.tell(ReplayedMessage(p), p.sender) } map { case _ ⇒ ReplayMessagesSuccess @@ -55,10 +55,10 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } pipeTo (processor) onSuccess { case _ if publish ⇒ context.system.eventStream.publish(r) } - case ReadHighestSequenceNr(fromSequenceNr, processorId, processor) ⇒ + case ReadHighestSequenceNr(fromSequenceNr, persistenceId, processor) ⇒ // Send read highest sequence number to processor directly. No need // to resequence the result relative to written and looped messages. - asyncReadHighestSequenceNr(processorId, fromSequenceNr).map { + asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map { highest ⇒ ReadHighestSequenceNrSuccess(highest) } recover { case e ⇒ ReadHighestSequenceNrFailure(e) @@ -75,8 +75,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { if (publish) context.system.eventStream.publish(d) case Failure(e) ⇒ } - case d @ DeleteMessagesTo(processorId, toSequenceNr, permanent) ⇒ - asyncDeleteMessagesTo(processorId, toSequenceNr, permanent) onComplete { + case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒ + asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) case Failure(e) ⇒ } @@ -103,14 +103,14 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * journal. If `permanent` is set to `false`, the persistent messages are marked as * deleted, otherwise they are permanently deleted. */ - def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit] + def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Future[Unit] /** * Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr` * (inclusive). If `permanent` is set to `false`, the persistent messages are marked * as deleted, otherwise they are permanently deleted. */ - def asyncDeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] //#journal-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala index 0cb5b8582c..ba1b4ee0ba 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -43,21 +43,21 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash def asyncWriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]): Future[Unit] = (store ? WriteConfirmations(confirmations)).mapTo[Unit] - def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit] = + def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Future[Unit] = (store ? DeleteMessages(messageIds, permanent)).mapTo[Unit] - def asyncDeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = - (store ? DeleteMessagesTo(processorId, toSequenceNr, permanent)).mapTo[Unit] + def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit] = + (store ? DeleteMessagesTo(persistenceId, toSequenceNr, permanent)).mapTo[Unit] - def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Unit] = { + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Unit] = { val replayCompletionPromise = Promise[Unit] val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local)) - store.tell(ReplayMessages(processorId, fromSequenceNr, toSequenceNr, max), mediator) + store.tell(ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max), mediator) replayCompletionPromise.future } - def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] = - (store ? ReadHighestSequenceNr(processorId, fromSequenceNr)).mapTo[Long] + def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = + (store ? ReadHighestSequenceNr(persistenceId, fromSequenceNr)).mapTo[Long] } /** @@ -78,13 +78,13 @@ private[persistence] object AsyncWriteTarget { final case class WriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) @SerialVersionUID(1L) - final case class DeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) + final case class DeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) @SerialVersionUID(1L) - final case class DeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean) + final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) @SerialVersionUID(1L) - final case class ReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) + final case class ReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long) @SerialVersionUID(1L) case object ReplaySuccess @@ -93,7 +93,7 @@ private[persistence] object AsyncWriteTarget { final case class ReplayFailure(cause: Throwable) @SerialVersionUID(1L) - final case class ReadHighestSequenceNr(processorId: String, fromSequenceNr: Long) + final case class ReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long) } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index 0d43b9e393..e940ffae8a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -39,8 +39,8 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } throw e } - case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ - asyncReplayMessages(processorId, fromSequenceNr, toSequenceNr, max) { p ⇒ + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, processor, replayDeleted) ⇒ + asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒ if (!p.deleted || replayDeleted) processor.tell(ReplayedMessage(p), p.sender) } map { case _ ⇒ ReplayMessagesSuccess @@ -49,8 +49,8 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } pipeTo (processor) onSuccess { case _ if publish ⇒ context.system.eventStream.publish(r) } - case ReadHighestSequenceNr(fromSequenceNr, processorId, processor) ⇒ - asyncReadHighestSequenceNr(processorId, fromSequenceNr).map { + case ReadHighestSequenceNr(fromSequenceNr, persistenceId, processor) ⇒ + asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map { highest ⇒ ReadHighestSequenceNrSuccess(highest) } recover { case e ⇒ ReadHighestSequenceNrFailure(e) @@ -68,8 +68,8 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { case Failure(e) ⇒ requestorOption.foreach(_ ! DeleteMessagesFailure(e)) } - case d @ DeleteMessagesTo(processorId, toSequenceNr, permanent) ⇒ - Try(deleteMessagesTo(processorId, toSequenceNr, permanent)) match { + case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒ + Try(deleteMessagesTo(persistenceId, toSequenceNr, permanent)) match { case Success(_) ⇒ if (publish) context.system.eventStream.publish(d) case Failure(e) ⇒ } @@ -95,13 +95,13 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * journal. If `permanent` is set to `false`, the persistent messages are marked as * deleted, otherwise they are permanently deleted. */ - def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Unit + def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Unit /** * Plugin API: synchronously deletes all persistent messages up to `toSequenceNr` * (inclusive). If `permanent` is set to `false`, the persistent messages are marked * as deleted, otherwise they are permanently deleted. */ - def deleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Unit + def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit //#journal-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 327992b22d..51c1e2bd31 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -37,9 +37,9 @@ private[persistence] trait InmemMessages { // processor id -> persistent message var messages = Map.empty[String, Vector[PersistentRepr]] - def add(p: PersistentRepr) = messages = messages + (messages.get(p.processorId) match { - case Some(ms) ⇒ p.processorId -> (ms :+ p) - case None ⇒ p.processorId -> Vector(p) + def add(p: PersistentRepr) = messages = messages + (messages.get(p.persistenceId) match { + case Some(ms) ⇒ p.persistenceId -> (ms :+ p) + case None ⇒ p.persistenceId -> Vector(p) }) def update(pid: String, snr: Long)(f: PersistentRepr ⇒ PersistentRepr) = messages = messages.get(pid) match { @@ -79,11 +79,11 @@ private[persistence] class InmemStore extends Actor with InmemMessages { case WriteMessages(msgs) ⇒ sender() ! msgs.foreach(add) case WriteConfirmations(cnfs) ⇒ - sender() ! cnfs.foreach(cnf ⇒ update(cnf.processorId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms))) + sender() ! cnfs.foreach(cnf ⇒ update(cnf.persistenceId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms))) case DeleteMessages(msgIds, false) ⇒ - sender() ! msgIds.foreach(msgId ⇒ update(msgId.processorId, msgId.sequenceNr)(_.update(deleted = true))) + sender() ! msgIds.foreach(msgId ⇒ update(msgId.persistenceId, msgId.sequenceNr)(_.update(deleted = true))) case DeleteMessages(msgIds, true) ⇒ - sender() ! msgIds.foreach(msgId ⇒ delete(msgId.processorId, msgId.sequenceNr)) + sender() ! msgIds.foreach(msgId ⇒ delete(msgId.persistenceId, msgId.sequenceNr)) case DeleteMessagesTo(pid, tsnr, false) ⇒ sender() ! (1L to tsnr foreach { snr ⇒ update(pid, snr)(_.update(deleted = true)) }) case DeleteMessagesTo(pid, tsnr, true) ⇒ @@ -91,7 +91,7 @@ private[persistence] class InmemStore extends Actor with InmemMessages { case ReplayMessages(pid, fromSnr, toSnr, max) ⇒ read(pid, fromSnr, toSnr, max).foreach(sender() ! _) sender() ! ReplaySuccess - case ReadHighestSequenceNr(processorId, _) ⇒ - sender() ! highestSequenceNr(processorId) + case ReadHighestSequenceNr(persistenceId, _) ⇒ + sender() ! highestSequenceNr(persistenceId) } } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala index 5b3616dbf5..392a074d85 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncRecovery.scala @@ -17,11 +17,11 @@ import akka.persistence.PersistentRepr abstract class AsyncRecovery extends SAsyncReplay with AsyncRecoveryPlugin { this: Actor ⇒ import context.dispatcher - final def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit) = - doAsyncReplayMessages(processorId, fromSequenceNr, toSequenceNr, max, new Procedure[PersistentRepr] { + final def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit) = + doAsyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max, new Procedure[PersistentRepr] { def apply(p: PersistentRepr) = replayCallback(p) }).map(Unit.unbox) - final def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] = - doAsyncReadHighestSequenceNr(processorId, fromSequenceNr: Long).map(_.longValue) + final def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = + doAsyncReadHighestSequenceNr(persistenceId, fromSequenceNr: Long).map(_.longValue) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala index 9cdac531e2..e33ba02d2d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala @@ -22,9 +22,9 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w final def asyncWriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) = doAsyncWriteConfirmations(confirmations.asJava).map(Unit.unbox) - final def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) = + final def asyncDeleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) = doAsyncDeleteMessages(messageIds.asJava, permanent).map(Unit.unbox) - final def asyncDeleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean) = - doAsyncDeleteMessagesTo(processorId, toSequenceNr, permanent).map(Unit.unbox) + final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = + doAsyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent).map(Unit.unbox) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala index 0f9e82e877..033eb0be7c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala @@ -20,9 +20,9 @@ abstract class SyncWriteJournal extends AsyncRecovery with SSyncWriteJournal wit final def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) = doWriteConfirmations(confirmations.asJava) - final def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) = + final def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) = doDeleteMessages(messageIds.asJava, permanent) - final def deleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean) = - doDeleteMessagesTo(processorId, toSequenceNr, permanent) + final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = + doDeleteMessagesTo(persistenceId, toSequenceNr, permanent) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala index 0a1f98217c..2756a11827 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala @@ -11,14 +11,14 @@ import java.nio.ByteBuffer * LevelDB key. */ private[leveldb] final case class Key( - processorId: Int, + persistenceId: Int, sequenceNr: Long, channelId: Int) private[leveldb] object Key { def keyToBytes(key: Key): Array[Byte] = { val bb = ByteBuffer.allocate(20) - bb.putInt(key.processorId) + bb.putInt(key.persistenceId) bb.putLong(key.sequenceNr) bb.putInt(key.channelId) bb.array @@ -32,15 +32,15 @@ private[leveldb] object Key { new Key(aid, snr, cid) } - def counterKey(processorId: Int): Key = Key(processorId, 0L, 0) + def counterKey(persistenceId: Int): Key = Key(persistenceId, 0L, 0) def counterToBytes(ctr: Long): Array[Byte] = ByteBuffer.allocate(8).putLong(ctr).array def counterFromBytes(bytes: Array[Byte]): Long = ByteBuffer.wrap(bytes).getLong def id(key: Key) = key.channelId def idKey(id: Int) = Key(1, 0L, id) - def isIdKey(key: Key): Boolean = key.processorId == 1 + def isIdKey(key: Key): Boolean = key.persistenceId == 1 - def deletionKey(processorId: Int, sequenceNr: Long): Key = Key(processorId, sequenceNr, 1) + def deletionKey(persistenceId: Int, sequenceNr: Long): Key = Key(persistenceId, sequenceNr, 1) def isDeletionKey(key: Key): Boolean = key.channelId == 1 } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala index 618fa3f65c..5f1acef194 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbRecovery.scala @@ -22,17 +22,17 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb private lazy val replayDispatcherId = config.getString("replay-dispatcher") private lazy val replayDispatcher = context.system.dispatchers.lookup(replayDispatcherId) - def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] = { - val nid = numericId(processorId) + def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { + val nid = numericId(persistenceId) Future(readHighestSequenceNr(nid))(replayDispatcher) } - def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = { - val nid = numericId(processorId) + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = { + val nid = numericId(persistenceId) Future(replayMessages(nid, fromSequenceNr: Long, toSequenceNr, max: Long)(replayCallback))(replayDispatcher) } - def replayMessages(processorId: Int, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Unit = { + def replayMessages(persistenceId: Int, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Unit = { @scala.annotation.tailrec def go(iter: DBIterator, key: Key, ctr: Long, replayCallback: PersistentRepr ⇒ Unit) { if (iter.hasNext) { @@ -43,7 +43,7 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb } else if (nextKey.channelId != 0) { // phantom confirmation (just advance iterator) go(iter, nextKey, ctr, replayCallback) - } else if (key.processorId == nextKey.processorId) { + } else if (key.persistenceId == nextKey.persistenceId) { val msg = persistentFromBytes(nextEntry.getValue) val del = deletion(iter, nextKey) val cnf = confirms(iter, nextKey, Nil) @@ -60,7 +60,7 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb if (iter.hasNext) { val nextEntry = iter.peekNext() val nextKey = keyFromBytes(nextEntry.getKey) - if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr) { + if (key.persistenceId == nextKey.persistenceId && key.sequenceNr == nextKey.sequenceNr) { val nextValue = new String(nextEntry.getValue, "UTF-8") iter.next() confirms(iter, nextKey, nextValue :: channelIds) @@ -72,7 +72,7 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb if (iter.hasNext) { val nextEntry = iter.peekNext() val nextKey = keyFromBytes(nextEntry.getKey) - if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr && isDeletionKey(nextKey)) { + if (key.persistenceId == nextKey.persistenceId && key.sequenceNr == nextKey.sequenceNr && isDeletionKey(nextKey)) { iter.next() true } else false @@ -80,16 +80,16 @@ private[persistence] trait LeveldbRecovery extends AsyncRecovery { this: Leveldb } withIterator { iter ⇒ - val startKey = Key(processorId, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0) + val startKey = Key(persistenceId, if (fromSequenceNr < 1L) 1L else fromSequenceNr, 0) iter.seek(keyToBytes(startKey)) go(iter, startKey, 0L, replayCallback) } } - def readHighestSequenceNr(processorId: Int) = { + def readHighestSequenceNr(persistenceId: Int) = { val ro = leveldbSnapshot() try { - leveldb.get(keyToBytes(counterKey(processorId)), ro) match { + leveldb.get(keyToBytes(counterKey(persistenceId)), ro) match { case null ⇒ 0L case bytes ⇒ counterFromBytes(bytes) } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index 00d5816f24..bc99a34324 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -50,15 +50,15 @@ private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]) = withBatch(batch ⇒ confirmations.foreach(confirmation ⇒ addToConfirmationBatch(confirmation, batch))) - def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean) = withBatch { batch ⇒ + def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean) = withBatch { batch ⇒ messageIds foreach { messageId ⇒ - if (permanent) batch.delete(keyToBytes(Key(numericId(messageId.processorId), messageId.sequenceNr, 0))) - else batch.put(keyToBytes(deletionKey(numericId(messageId.processorId), messageId.sequenceNr)), Array.emptyByteArray) + if (permanent) batch.delete(keyToBytes(Key(numericId(messageId.persistenceId), messageId.sequenceNr, 0))) + else batch.put(keyToBytes(deletionKey(numericId(messageId.persistenceId), messageId.sequenceNr)), Array.emptyByteArray) } } - def deleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean) = withBatch { batch ⇒ - val nid = numericId(processorId) + def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean) = withBatch { batch ⇒ + val nid = numericId(persistenceId) // seek to first existing message val fromSequenceNr = withIterator { iter ⇒ @@ -101,13 +101,13 @@ private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with def persistentFromBytes(a: Array[Byte]): PersistentRepr = serialization.deserialize(a, classOf[PersistentRepr]).get private def addToMessageBatch(persistent: PersistentRepr, batch: WriteBatch): Unit = { - val nid = numericId(persistent.processorId) + val nid = numericId(persistent.persistenceId) batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr)) batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent)) } private def addToConfirmationBatch(confirmation: PersistentConfirmation, batch: WriteBatch): Unit = { - val npid = numericId(confirmation.processorId) + val npid = numericId(confirmation.persistenceId) val ncid = numericId(confirmation.channelId) batch.put(keyToBytes(Key(npid, confirmation.sequenceNr, ncid)), confirmation.channelId.getBytes("UTF-8")) } diff --git a/akka-persistence/src/main/scala/akka/persistence/package.scala b/akka-persistence/src/main/scala/akka/persistence/package.scala index e43e131973..3226941d76 100644 --- a/akka-persistence/src/main/scala/akka/persistence/package.scala +++ b/akka-persistence/src/main/scala/akka/persistence/package.scala @@ -8,7 +8,7 @@ package akka package object persistence { implicit val snapshotMetadataOrdering = new Ordering[SnapshotMetadata] { def compare(x: SnapshotMetadata, y: SnapshotMetadata) = - if (x.processorId == y.processorId) math.signum(x.sequenceNr - y.sequenceNr).toInt - else x.processorId.compareTo(y.processorId) + if (x.persistenceId == y.persistenceId) math.signum(x.sequenceNr - y.sequenceNr).toInt + else x.persistenceId.compareTo(y.persistenceId) } } diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 5b50b7e367..e8f978090e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -30,7 +30,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { val PersistentImplClass = classOf[PersistentImpl] val ConfirmablePersistentImplClass = classOf[ConfirmablePersistentImpl] val DeliveredByTransientChannelClass = classOf[DeliveredByChannel] - val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistentChannel] + val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistenceChannel] val DeliverClass = classOf[Deliver] def identifier: Int = 7 @@ -47,12 +47,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { * serialization of a persistent message's payload to a matching `akka.serialization.Serializer`. */ def toBinary(o: AnyRef): Array[Byte] = o match { - case b: PersistentBatch ⇒ persistentMessageBatchBuilder(b).build().toByteArray - case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray - case c: DeliveredByChannel ⇒ deliveredMessageBuilder(c).build().toByteArray - case c: DeliveredByPersistentChannel ⇒ deliveredMessageBuilder(c).build().toByteArray - case d: Deliver ⇒ deliverMessageBuilder(d).build.toByteArray - case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") + case b: PersistentBatch ⇒ persistentMessageBatchBuilder(b).build().toByteArray + case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray + case c: DeliveredByChannel ⇒ deliveredMessageBuilder(c).build().toByteArray + case c: DeliveredByPersistenceChannel ⇒ deliveredMessageBuilder(c).build().toByteArray + case d: Deliver ⇒ deliverMessageBuilder(d).build.toByteArray + case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") } /** @@ -95,7 +95,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { private def persistentMessageBuilder(persistent: PersistentRepr) = { val builder = PersistentMessage.newBuilder - if (persistent.processorId != Undefined) builder.setProcessorId(persistent.processorId) + if (persistent.persistenceId != Undefined) builder.setPersistenceId(persistent.persistenceId) if (persistent.confirmMessage != null) builder.setConfirmMessage(deliveredMessageBuilder(persistent.confirmMessage)) if (persistent.confirmTarget != null) builder.setConfirmTarget(Serialization.serializedActorPath(persistent.confirmTarget)) if (persistent.sender != null) builder.setSender(Serialization.serializedActorPath(persistent.sender)) @@ -115,7 +115,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { val serializer = SerializationExtension(system).findSerializerFor(payload) val builder = PersistentPayload.newBuilder() - if (serializer.includeManifest) builder.setPayloadManifest((ByteString.copyFromUtf8(payload.getClass.getName))) + if (serializer.includeManifest) builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName)) builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload))) builder.setSerializerId(serializer.identifier) @@ -139,7 +139,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { builder.setDeliverySequenceNr(delivered.deliverySequenceNr) delivered match { - case c: DeliveredByChannel ⇒ builder.setProcessorId(c.processorId) + case c: DeliveredByChannel ⇒ builder.setPersistenceId(c.persistenceId) case _ ⇒ builder } } @@ -161,7 +161,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { PersistentRepr( payload(persistentMessage.getPayload), persistentMessage.getSequenceNr, - if (persistentMessage.hasProcessorId) persistentMessage.getProcessorId else Undefined, + if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined, persistentMessage.getDeleted, persistentMessage.getRedeliveries, immutableSeq(persistentMessage.getConfirmsList), @@ -184,15 +184,15 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { private def delivered(deliveredMessage: DeliveredMessage): Delivered = { val channel = if (deliveredMessage.hasChannel) system.provider.resolveActorRef(deliveredMessage.getChannel) else null - if (deliveredMessage.hasProcessorId) { + if (deliveredMessage.hasPersistenceId) { DeliveredByChannel( - deliveredMessage.getProcessorId, + deliveredMessage.getPersistenceId, deliveredMessage.getChannelId, deliveredMessage.getPersistentSequenceNr, deliveredMessage.getDeliverySequenceNr, channel) } else { - DeliveredByPersistentChannel( + DeliveredByPersistenceChannel( deliveredMessage.getChannelId, deliveredMessage.getPersistentSequenceNr, deliveredMessage.getDeliverySequenceNr, diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index 297158ac4f..3203fc0068 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -22,13 +22,13 @@ trait SnapshotStore extends Actor { private val publish = extension.settings.internal.publishPluginCommands final def receive = { - case LoadSnapshot(processorId, criteria, toSequenceNr) ⇒ + case LoadSnapshot(persistenceId, criteria, toSequenceNr) ⇒ val p = sender() - loadAsync(processorId, criteria.limit(toSequenceNr)) map { + loadAsync(persistenceId, criteria.limit(toSequenceNr)) map { sso ⇒ LoadSnapshotResult(sso, toSequenceNr) } recover { case e ⇒ LoadSnapshotResult(None, toSequenceNr) - } pipeTo (p) + } pipeTo p case SaveSnapshot(metadata, snapshot) ⇒ val p = sender() val md = metadata.copy(timestamp = System.currentTimeMillis) @@ -58,7 +58,7 @@ trait SnapshotStore extends Actor { * @param processorId processor id. * @param criteria selection criteria for loading. */ - def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] + def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] /** * Plugin API: asynchronously saves a snapshot. @@ -86,9 +86,9 @@ trait SnapshotStore extends Actor { /** * Plugin API: deletes all snapshots matching `criteria`. * - * @param processorId processor id. + * @param persistenceId processor id. * @param criteria selection criteria for deleting. */ - def delete(processorId: String, criteria: SnapshotSelectionCriteria) + def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) //#snapshot-store-plugin-api } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala index e00c2038db..1d693eec61 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/japi/SnapshotStore.scala @@ -16,8 +16,8 @@ import akka.persistence.snapshot.{ SnapshotStore ⇒ SSnapshotStore } abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { import context.dispatcher - final def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria) = - doLoadAsync(processorId, criteria).map(_.asScala) + final def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria) = + doLoadAsync(persistenceId, criteria).map(_.asScala) final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = doSaveAsync(metadata, snapshot).map(Unit.unbox) @@ -28,7 +28,7 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { final def delete(metadata: SnapshotMetadata) = doDelete(metadata) - final def delete(processorId: String, criteria: SnapshotSelectionCriteria) = - doDelete(processorId: String, criteria: SnapshotSelectionCriteria) + final def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = + doDelete(persistenceId: String, criteria: SnapshotSelectionCriteria) } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index e43bd6b17a..40a41c20e5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -33,7 +33,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo private val serializationExtension = SerializationExtension(context.system) private var saving = immutable.Set.empty[SnapshotMetadata] // saving in progress - def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { + def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { // // Heuristics: // @@ -44,7 +44,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo // // TODO: make number of loading attempts configurable // - val metadata = snapshotMetadata(processorId, criteria).sorted.takeRight(3) + val metadata = snapshotMetadata(persistenceId, criteria).sorted.takeRight(3) Future(load(metadata))(streamDispatcher) } @@ -62,8 +62,8 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo snapshotFile(metadata).delete() } - def delete(processorId: String, criteria: SnapshotSelectionCriteria) = { - snapshotMetadata(processorId, criteria).foreach(delete) + def delete(persistenceId: String, criteria: SnapshotSelectionCriteria) = { + snapshotMetadata(persistenceId, criteria).foreach(delete) } @scala.annotation.tailrec @@ -102,10 +102,10 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo try { p(stream) } finally { stream.close() } private def snapshotFile(metadata: SnapshotMetadata, extension: String = ""): File = - new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.processorId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}${extension}") + new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}${extension}") - private def snapshotMetadata(processorId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = - snapshotDir.listFiles(new SnapshotFilenameFilter(processorId)).map(_.getName).collect { + private def snapshotMetadata(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = + snapshotDir.listFiles(new SnapshotFilenameFilter(persistenceId)).map(_.getName).collect { case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong) }.filter(md ⇒ criteria.matches(md) && !saving.contains(md)).toVector @@ -119,7 +119,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo super.preStart() } - private class SnapshotFilenameFilter(processorId: String) extends FilenameFilter { - def accept(dir: File, name: String): Boolean = name.startsWith(s"snapshot-${URLEncoder.encode(processorId, "UTF-8")}") + private class SnapshotFilenameFilter(persistenceId: String) extends FilenameFilter { + def accept(dir: File, name: String): Boolean = name.startsWith(s"snapshot-${URLEncoder.encode(persistenceId, "UTF-8")}") } } diff --git a/akka-persistence/src/test/scala/akka/persistence/FailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/FailureSpec.scala index 40fea2a963..26053d1a4c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/FailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/FailureSpec.scala @@ -61,7 +61,7 @@ object FailureSpec { val channel = context.actorOf(Channel.props("channel", ChannelSettings(redeliverMax = 10, redeliverInterval = 500 milliseconds)), "channel") - override def processorId = "chaos" + override def persistenceId = "chaos" def receive = { case p @ Persistent(i: Int, _) ⇒ diff --git a/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala index 22e725c183..06ad3a9ab2 100644 --- a/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala @@ -22,7 +22,6 @@ object NumberProcessorSpec { case object GetNumber class NumberProcessorWithPersistentChannel(name: String) extends NamedProcessor(name) { - override def processorId = name var num = 0 val channel = context.actorOf(PersistentChannel.props(channelId = "stable_id", @@ -52,7 +51,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu "resurrect with the correct state, not replaying confirmed messages to clients" in { val deliveredProbe = TestProbe() - system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistentChannel]) + system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistenceChannel]) val probe = TestProbe() @@ -63,7 +62,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu zero.confirm() zero.payload should equal(0) - deliveredProbe.expectMsgType[DeliveredByPersistentChannel] + deliveredProbe.expectMsgType[DeliveredByPersistenceChannel] processor.tell(Persistent(DecrementAndGet), probe.testActor) @@ -71,7 +70,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu decrementFrom0.confirm() decrementFrom0.payload should equal(-1) - deliveredProbe.expectMsgType[DeliveredByPersistentChannel] + deliveredProbe.expectMsgType[DeliveredByPersistenceChannel] watch(processor) system.stop(processor) @@ -84,7 +83,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu decrementFromMinus1.confirm() decrementFromMinus1.payload should equal(-2) - deliveredProbe.expectMsgType[DeliveredByPersistentChannel] + deliveredProbe.expectMsgType[DeliveredByPersistenceChannel] } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index d1dc7e1c35..86484691e7 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -30,8 +30,8 @@ object PerformanceSpec { var startTime: Long = 0L var stopTime: Long = 0L - var startSequenceNr = 0L; - var stopSequenceNr = 0L; + var startSequenceNr = 0L + var stopSequenceNr = 0L def startMeasure(): Unit = { startSequenceNr = lastSequenceNr @@ -169,9 +169,9 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor def stressPersistentChannel(): Unit = { val channel = system.actorOf(PersistentChannel.props()) val destination = system.actorOf(Props[PerformanceTestDestination]) - 1 to warmupCycles foreach { i ⇒ channel ! Deliver(PersistentRepr(s"msg${i}", processorId = "test"), destination.path) } + 1 to warmupCycles foreach { i ⇒ channel ! Deliver(PersistentRepr(s"msg${i}", persistenceId = "test"), destination.path) } channel ! Deliver(Persistent(StartMeasure), destination.path) - 1 to loadCycles foreach { i ⇒ channel ! Deliver(PersistentRepr(s"msg${i}", processorId = "test"), destination.path) } + 1 to loadCycles foreach { i ⇒ channel ! Deliver(PersistentRepr(s"msg${i}", persistenceId = "test"), destination.path) } channel ! Deliver(Persistent(StopMeasure), destination.path) expectMsgPF(100 seconds) { case throughput: Double ⇒ println(f"\nthroughput = $throughput%.2f persistent messages per second") @@ -179,10 +179,10 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor } def subscribeToConfirmation(probe: TestProbe): Unit = - system.eventStream.subscribe(probe.ref, classOf[DeliveredByPersistentChannel]) + system.eventStream.subscribe(probe.ref, classOf[DeliveredByPersistenceChannel]) def awaitConfirmation(probe: TestProbe): Unit = - probe.expectMsgType[DeliveredByPersistentChannel] + probe.expectMsgType[DeliveredByPersistenceChannel] "A command sourced processor" should { "have some reasonable throughput" in { @@ -213,7 +213,7 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor stressPersistentChannel() probe.fishForMessage(100.seconds) { - case DeliveredByPersistentChannel(_, snr, _, _) ⇒ snr == warmupCycles + loadCycles + 2 + case DeliveredByPersistenceChannel(_, snr, _, _) ⇒ snr == warmupCycles + loadCycles + 2 } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 66e32c8707..c4ec7a2449 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -77,7 +77,7 @@ trait Cleanup { this: AkkaSpec ⇒ } abstract class NamedProcessor(name: String) extends Processor { - override def processorId: String = name + override def persistenceId: String = name } trait TurnOffRecoverOnStart { this: Processor ⇒ diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala index e4e081b775..bc0e273f37 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala @@ -71,8 +71,8 @@ abstract class PersistentChannelSpec(config: Config) extends ChannelSpec(config) "not modify certain persistent message fields" in { val destProbe = TestProbe() - val persistent1 = PersistentRepr(payload = "a", processorId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel, sequenceNr = 13) - val persistent2 = PersistentRepr(payload = "b", processorId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel) + val persistent1 = PersistentRepr(payload = "a", persistenceId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel, sequenceNr = 13) + val persistent2 = PersistentRepr(payload = "b", persistenceId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel) defaultTestChannel ! Deliver(persistent1, destProbe.ref.path) defaultTestChannel ! Deliver(persistent2, destProbe.ref.path) diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala index de60fc8dfa..8b4863e2c5 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotDirectoryFailureSpec.scala @@ -13,7 +13,7 @@ object SnapshotDirectoryFailureSpec { class TestProcessor(name: String, probe: ActorRef) extends Processor { - override def processorId: String = name + override def persistenceId: String = name override def preStart(): Unit = () diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index 56b69afbd8..0746352130 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -53,7 +53,7 @@ class SnapshotFailureRobustnessSpec extends AkkaSpec(PersistenceSpec.config("lev "A processor with a failing snapshot" must { "recover state starting from the most recent complete snapshot" in { val sProcessor = system.actorOf(Props(classOf[SaveSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name expectMsg(RecoveryCompleted) sProcessor ! Persistent("blahonga") @@ -67,7 +67,7 @@ class SnapshotFailureRobustnessSpec extends AkkaSpec(PersistenceSpec.config("lev val lProcessor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) lProcessor ! Recover() expectMsgPF() { - case (SnapshotMetadata(`processorId`, 1, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 1, timestamp), state) ⇒ state should be("blahonga") timestamp should be > (0L) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala index 13beeb8d0e..f776da7490 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSerializationSpec.scala @@ -77,14 +77,14 @@ class SnapshotSerializationSpec extends AkkaSpec(PersistenceSpec.config("leveldb "A processor with custom Serializer" must { "be able to handle serialization header of more than 255 bytes" in { val sProcessor = system.actorOf(Props(classOf[TestProcessor], name, testActor)) - val processorId = name + val persistenceId = name sProcessor ! "blahonga" expectMsg(0) val lProcessor = system.actorOf(Props(classOf[TestProcessor], name, testActor)) lProcessor ! Recover() expectMsgPF() { - case (SnapshotMetadata(`processorId`, 0, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 0, timestamp), state) ⇒ state should be(new MySnapshot("blahonga")) timestamp should be > (0L) } diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index 5176f60cd0..2df4db483f 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -64,12 +64,12 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS "A processor" must { "recover state starting from the most recent snapshot" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name processor ! Recover() expectMsgPF() { - case (SnapshotMetadata(`processorId`, 4, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp should be > (0L) } @@ -79,12 +79,12 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS } "recover state starting from the most recent snapshot matching an upper sequence number bound" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name processor ! Recover(toSequenceNr = 3) expectMsgPF() { - case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should be(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -93,13 +93,13 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS } "recover state starting from the most recent snapshot matching an upper sequence number bound (without further replay)" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name processor ! Recover(toSequenceNr = 4) processor ! "done" expectMsgPF() { - case (SnapshotMetadata(`processorId`, 4, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 4, timestamp), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp should be > (0L) } @@ -108,12 +108,12 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS } "recover state starting from the most recent snapshot matching criteria" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2)) expectMsgPF() { - case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should be(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -125,12 +125,12 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS } "recover state starting from the most recent snapshot matching criteria and an upper sequence number bound" in { val processor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3) expectMsgPF() { - case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ + case (SnapshotMetadata(`persistenceId`, 2, timestamp), state) ⇒ state should be(List("a-1", "b-2").reverse) timestamp should be > (0L) } @@ -151,7 +151,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS val deleteProbe = TestProbe() val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshot]) @@ -160,7 +160,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS processor1 ! "done" val metadata = expectMsgPF() { - case (md @ SnapshotMetadata(`processorId`, 4, _), state) ⇒ + case (md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) md } @@ -175,7 +175,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS processor2 ! Recover(toSequenceNr = 4) expectMsgPF() { - case (md @ SnapshotMetadata(`processorId`, 2, _), state) ⇒ + case (md @ SnapshotMetadata(`persistenceId`, 2, _), state) ⇒ state should be(List("a-1", "b-2").reverse) md } @@ -187,7 +187,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS val deleteProbe = TestProbe() val processor1 = system.actorOf(Props(classOf[DeleteSnapshotTestProcessor], name, testActor)) - val processorId = name + val persistenceId = name system.eventStream.subscribe(deleteProbe.ref, classOf[DeleteSnapshots]) @@ -195,7 +195,7 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotS processor1 ! Recover(toSequenceNr = 4) processor1 ! DeleteN(SnapshotSelectionCriteria(maxSequenceNr = 4)) expectMsgPF() { - case (md @ SnapshotMetadata(`processorId`, 4, _), state) ⇒ + case (md @ SnapshotMetadata(`persistenceId`, 4, _), state) ⇒ state should be(List("a-1", "b-2", "c-3", "d-4").reverse) } expectMsg(RecoveryCompleted) diff --git a/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala index e1ead270eb..2fc01670ef 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala @@ -27,7 +27,7 @@ object ViewSpec { this(name, probe, 100.milliseconds) override def autoUpdateInterval: FiniteDuration = interval.dilated(context.system) - override val processorId: String = name + override val persistenceId: String = name var last: String = _ @@ -50,7 +50,7 @@ object ViewSpec { } class PassiveTestView(name: String, probe: ActorRef, var failAt: Option[String]) extends View { - override val processorId: String = name + override val persistenceId: String = name override def autoUpdate: Boolean = false override def autoUpdateReplayMax: Long = 0L // no message replay during initial recovery @@ -70,10 +70,11 @@ object ViewSpec { super.postRestart(reason) failAt = None } + } class ActiveTestView(name: String, probe: ActorRef) extends View { - override val processorId: String = name + override val persistenceId: String = name override def autoUpdateInterval: FiniteDuration = 50.millis override def autoUpdateReplayMax: Long = 2 @@ -92,8 +93,8 @@ object ViewSpec { } class EmittingView(name: String, destination: ActorRef) extends View { + override val persistenceId: String = name override def autoUpdateInterval: FiniteDuration = 100.milliseconds.dilated(context.system) - override val processorId: String = name val channel = context.actorOf(Channel.props(s"${name}-channel")) @@ -106,10 +107,11 @@ object ViewSpec { } class SnapshottingView(name: String, probe: ActorRef) extends View { - override def autoUpdateInterval: FiniteDuration = 100.microseconds.dilated(context.system) - override val processorId: String = name + override val persistenceId: String = name override val viewId: String = s"${name}-replicator" + override def autoUpdateInterval: FiniteDuration = 100.microseconds.dilated(context.system) + var last: String = _ def receive = { diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala index f7b45a3907..e41ca93fdb 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala @@ -16,7 +16,7 @@ class WriteFailedException(ps: Seq[PersistentRepr]) extends TestException(s"write failed for payloads = [${ps.map(_.payload)}]") class ConfirmFailedException(cs: Seq[PersistentConfirmation]) - extends TestException(s"write failed for confirmations = [${cs.map(c ⇒ s"${c.processorId}-${c.sequenceNr}-${c.channelId}")}]") + extends TestException(s"write failed for confirmations = [${cs.map(c ⇒ s"${c.persistenceId}-${c.sequenceNr}-${c.channelId}")}]") class ReplayFailedException(ps: Seq[PersistentRepr]) extends TestException(s"recovery failed after replaying payloads = [${ps.map(_.payload)}]") @@ -24,7 +24,7 @@ class ReplayFailedException(ps: Seq[PersistentRepr]) class ReadHighestFailedException extends TestException(s"recovery failed when reading highest sequence number") -class DeleteFailedException(messageIds: immutable.Seq[PersistentId]) +class DeleteFailedException(messageIds: immutable.Seq[PersistenceId]) extends TestException(s"delete failed for message ids = [${messageIds}]") /** @@ -51,30 +51,30 @@ class ChaosJournal extends SyncWriteJournal { def writeConfirmations(confirmations: immutable.Seq[PersistentConfirmation]): Unit = if (shouldFail(confirmFailureRate)) throw new ConfirmFailedException(confirmations) - else confirmations.foreach(cnf ⇒ update(cnf.processorId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms))) + else confirmations.foreach(cnf ⇒ update(cnf.persistenceId, cnf.sequenceNr)(p ⇒ p.update(confirms = cnf.channelId +: p.confirms))) - def deleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Unit = + def deleteMessages(messageIds: immutable.Seq[PersistenceId], permanent: Boolean): Unit = if (shouldFail(deleteFailureRate)) throw new DeleteFailedException(messageIds) - else if (permanent) messageIds.foreach(mid ⇒ update(mid.processorId, mid.sequenceNr)(_.update(deleted = true))) - else messageIds.foreach(mid ⇒ del(mid.processorId, mid.sequenceNr)) + else if (permanent) messageIds.foreach(mid ⇒ update(mid.persistenceId, mid.sequenceNr)(_.update(deleted = true))) + else messageIds.foreach(mid ⇒ del(mid.persistenceId, mid.sequenceNr)) - def deleteMessagesTo(processorId: String, toSequenceNr: Long, permanent: Boolean): Unit = - (1L to toSequenceNr).map(PersistentIdImpl(processorId, _)) + def deleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Unit = + (1L to toSequenceNr).map(PersistenceIdImpl(persistenceId, _)) - def asyncReplayMessages(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Unit] = + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Unit] = if (shouldFail(replayFailureRate)) { - val rm = read(processorId, fromSequenceNr, toSequenceNr, max) + val rm = read(persistenceId, fromSequenceNr, toSequenceNr, max) val sm = rm.take(random.nextInt(rm.length + 1)) sm.foreach(replayCallback) Future.failed(new ReplayFailedException(sm)) } else { - read(processorId, fromSequenceNr, toSequenceNr, max).foreach(replayCallback) + read(persistenceId, fromSequenceNr, toSequenceNr, max).foreach(replayCallback) Future.successful(()) } - def asyncReadHighestSequenceNr(processorId: String, fromSequenceNr: Long): Future[Long] = + def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = if (shouldFail(readHighestFailureRate)) Future.failed(new ReadHighestFailedException) - else Future.successful(highestSequenceNr(processorId)) + else Future.successful(highestSequenceNr(persistenceId)) def shouldFail(rate: Double): Boolean = random.nextDouble() < rate diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 91f5797f45..61ce089fcf 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -124,11 +124,11 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { deserialized should be(confirmation) } "handle DeliveredByPersistentChannel message serialization" in { - val confirmation = DeliveredByPersistentChannel("c2", 14) + val confirmation = DeliveredByPersistenceChannel("c2", 14) val serializer = serialization.findSerializerFor(confirmation) val bytes = serializer.toBinary(confirmation) - val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByPersistentChannel])) + val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByPersistenceChannel])) deserialized should be(confirmation) } @@ -149,7 +149,7 @@ object MessageSerializerRemotingSpec { case ConfirmablePersistent(MyPayload(data), _, _) ⇒ sender() ! s"c${data}" case Persistent(MyPayload(data), _) ⇒ sender() ! s"p${data}" case DeliveredByChannel(pid, cid, msnr, dsnr, ep) ⇒ sender() ! s"${pid},${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}" - case DeliveredByPersistentChannel(cid, msnr, dsnr, ep) ⇒ sender() ! s"${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}" + case DeliveredByPersistenceChannel(cid, msnr, dsnr, ep) ⇒ sender() ! s"${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}" case Deliver(Persistent(payload, _), dp) ⇒ context.actorSelection(dp) ! payload } } @@ -194,7 +194,7 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS expectMsg("a,b,2,3,true") } "serialize DeliveredByPersistentChannel messages during remoting" in { - localActor ! DeliveredByPersistentChannel("c", 2, 3, testActor) + localActor ! DeliveredByPersistenceChannel("c", 2, 3, testActor) expectMsg("c,2,3,true") } "serialize Deliver messages during remoting" in { diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index a943427f2f..09e6753e24 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -24,9 +24,9 @@ public class LambdaPersistenceDocTest { public interface SomeOtherMessage {} public interface ProcessorMethods { - //#processor-id - public String processorId(); - //#processor-id + //#persistence-id + public String persistenceId(); + //#persistence-id //#recovery-status public boolean recoveryRunning(); public boolean recoveryFinished(); @@ -121,13 +121,13 @@ public class LambdaPersistenceDocTest { } class MyProcessor4 extends AbstractProcessor implements ProcessorMethods { - //#processor-id-override + //#persistence-id-override @Override - public String processorId() { - return "my-stable-processor-id"; + public String persistenceId() { + return "my-stable-persistence-id"; } - //#processor-id-override + //#persistence-id-override public MyProcessor4() { receive(ReceiveBuilder. match(Persistent.class, received -> {/* ... */}).build() @@ -494,8 +494,8 @@ public class LambdaPersistenceDocTest { //#view class MyView extends AbstractView { @Override - public String processorId() { - return "some-processor-id"; + public String persistenceId() { + return "some-persistence-id"; } public MyView() { diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java index 3999650804..d3cd3998c8 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistencePluginDocTest.java @@ -6,7 +6,6 @@ package doc; //#plugin-imports import akka.japi.pf.ReceiveBuilder; -import scala.PartialFunction; import scala.concurrent.Future; import akka.japi.Option; import akka.japi.Procedure; @@ -17,7 +16,6 @@ import akka.persistence.snapshot.japi.*; import akka.actor.*; import akka.persistence.journal.leveldb.SharedLeveldbJournal; import akka.persistence.journal.leveldb.SharedLeveldbStore; -import scala.runtime.BoxedUnit; public class LambdaPersistencePluginDocTest { @@ -55,7 +53,7 @@ public class LambdaPersistencePluginDocTest { class MySnapshotStore extends SnapshotStore { @Override - public Future> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria) { + public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria criteria) { return null; } @@ -73,7 +71,7 @@ public class LambdaPersistencePluginDocTest { } @Override - public void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception { + public void doDelete(String persistenceId, SnapshotSelectionCriteria criteria) throws Exception { } } @@ -89,17 +87,17 @@ public class LambdaPersistencePluginDocTest { } @Override - public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { + public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { return null; } @Override - public Future doAsyncDeleteMessagesTo(String processorId, long toSequenceNr, boolean permanent) { + public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr, boolean permanent) { return null; } @Override - public Future doAsyncReplayMessages(String processorId, long fromSequenceNr, + public Future doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max, Procedure replayCallback) { @@ -107,7 +105,7 @@ public class LambdaPersistencePluginDocTest { } @Override - public Future doAsyncReadHighestSequenceNr(String processorId, long fromSequenceNr) { + public Future doAsyncReadHighestSequenceNr(String persistenceId, long fromSequenceNr) { return null; } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java index 7344f5efb5..2fe074111e 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/sample/persistence/ViewExample.java @@ -45,7 +45,7 @@ public class ViewExample { } @Override - public String processorId() { + public String persistenceId() { return "processor-5"; } diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java index c231b3699a..b132a5d652 100644 --- a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java @@ -35,7 +35,7 @@ public class ViewExample { } @Override - public String processorId() { + public String persistenceId() { return "processor-5"; } diff --git a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala index 4699a7fbbc..edaaec1ac5 100644 --- a/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala @@ -18,7 +18,7 @@ object ViewExample extends App { class ExampleView extends View { private var numReplicated = 0 - override def processorId = "processor-5" + override def persistenceId: String = "processor-5" override def viewId = "view-5" private val destination = context.actorOf(Props[ExampleDestination]) @@ -35,6 +35,7 @@ object ViewExample extends App { println(s"view received ${payload} (sequence nr = ${sequenceNr}, num replicated = ${numReplicated})") channel ! Deliver(Persistent(s"replicated-${payload}"), destination.path) } + } class ExampleDestination extends Actor {