diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index c608c78387..9a4228c8b2 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -27,9 +27,6 @@ akka { journal { # Maximum size of a persistent message batch written to the journal. - # Only applies to internally created batches by processors that receive - # persistent messages individually. Application-defined batches, even if - # larger than this setting, are always written as a single isolated batch. max-message-batch-size = 200 # Maximum size of a confirmation batch written to the journal. diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 1cf3d531a8..cad94a987b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -164,14 +164,14 @@ private[persistence] trait Eventsourced extends ProcessorImpl { receiveRecover(RecoveryCompleted) } - sealed trait PendingHandlerInvocation { + private sealed trait PendingHandlerInvocation { def evt: Any def handler: Any ⇒ Unit } /** forces processor to stash incoming commands untill all these invocations are handled */ - final case class StashingHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation + private final case class StashingHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation /** does not force the processor to stash commands; Originates from either `persistAsync` or `defer` calls */ - final case class AsyncHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation + private final case class AsyncHandlerInvocation(evt: Any, handler: Any ⇒ Unit) extends PendingHandlerInvocation /** Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands */ private var pendingStashingPersistInvocations: Long = 0 @@ -199,21 +199,20 @@ private[persistence] trait Eventsourced extends ProcessorImpl { /** * Asynchronously persists `event`. On successful persistence, `handler` is called with the - * persisted event. It is guaranteed that no new commands will be received by a processor + * persisted event. It is guaranteed that no new commands will be received by a persistent actor * between a call to `persist` and the execution of its `handler`. This also holds for * multiple `persist` calls per received command. Internally, this is achieved by stashing new * commands and unstashing them when the `event` has been persisted and handled. The stash used - * for that is an internal stash which doesn't interfere with the user stash inherited from - * [[Processor]]. + * for that is an internal stash which doesn't interfere with the inherited user stash. * - * An event `handler` may close over processor state and modify it. The `sender` of a persisted + * An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted * event is the sender of the corresponding command. This means that one can reply to a command * sender within an event `handler`. * - * Within an event handler, applications usually update processor state using persisted event + * Within an event handler, applications usually update persistent actor state using persisted event * data, notify listeners and reply to command senders. * - * If persistence of an event fails, the processor will be stopped. This can be customized by + * If persistence of an event fails, the persistent actor will be stopped. This can be customized by * handling [[PersistenceFailure]] in [[receiveCommand]]. * * @param event event to be persisted @@ -241,16 +240,16 @@ private[persistence] trait Eventsourced extends ProcessorImpl { * Asynchronously persists `event`. On successful persistence, `handler` is called with the * persisted event. * - * Unlike `persist` the processor will continue to receive incomming commands between the + * Unlike `persist` the persistent actor will continue to receive incomming commands between the * call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of * of persist should be used when you favor throughput over the "command-2 only processed after * command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method. * - * An event `handler` may close over processor state and modify it. The `sender` of a persisted + * An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted * event is the sender of the corresponding command. This means that one can reply to a command * sender within an event `handler`. * - * If persistence of an event fails, the processor will be stopped. This can be customized by + * If persistence of an event fails, the persistent actor will be stopped. This can be customized by * handling [[PersistenceFailure]] in [[receiveCommand]]. * * @param event event to be persisted @@ -327,7 +326,7 @@ private[persistence] trait Eventsourced extends ProcessorImpl { * has been captured and saved, this handler will receive a [[SnapshotOffer]] message * followed by events that are younger than the offered snapshot. * - * This handler must not have side-effects other than changing processor state i.e. it + * This handler must not have side-effects other than changing persistent actor state i.e. it * should not perform actions that may fail, such as interacting with external services, * for example. * @@ -342,7 +341,7 @@ private[persistence] trait Eventsourced extends ProcessorImpl { * Command handler. Typically validates commands against current state (and/or by * communication with other actors). On successful validation, one or more events are * derived from a command and these events are then persisted by calling `persist`. - * Commands sent to event sourced processors should not be [[Persistent]] messages. + * Commands sent to event sourced persistent actors should not be [[Persistent]] messages. */ def receiveCommand: Receive @@ -420,21 +419,20 @@ abstract class UntypedPersistentActor extends UntypedActor with ProcessorImpl wi /** * Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the - * persisted event. It is guaranteed that no new commands will be received by a processor + * persisted event. It is guaranteed that no new commands will be received by a persistent actor * between a call to `persist` and the execution of its `handler`. This also holds for * multiple `persist` calls per received command. Internally, this is achieved by stashing new * commands and unstashing them when the `event` has been persisted and handled. The stash used - * for that is an internal stash which doesn't interfere with the user stash inherited from - * [[UntypedProcessor]]. + * for that is an internal stash which doesn't interfere with the inherited user stash. * - * An event `handler` may close over processor state and modify it. The `getSender()` of a persisted + * An event `handler` may close over persistent actor state and modify it. The `getSender()` of a persisted * event is the sender of the corresponding command. This means that one can reply to a command * sender within an event `handler`. * - * Within an event handler, applications usually update processor state using persisted event + * Within an event handler, applications usually update persistent actor state using persisted event * data, notify listeners and reply to command senders. * - * If persistence of an event fails, the processor will be stopped. This can be customized by + * If persistence of an event fails, the persistent actor will be stopped. This can be customized by * handling [[PersistenceFailure]] in [[onReceiveCommand]]. * * @param event event to be persisted. @@ -458,16 +456,16 @@ abstract class UntypedPersistentActor extends UntypedActor with ProcessorImpl wi * JAVA API: asynchronously persists `event`. On successful persistence, `handler` is called with the * persisted event. * - * Unlike `persist` the processor will continue to receive incomming commands between the + * Unlike `persist` the persistent actor will continue to receive incomming commands between the * call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of * of persist should be used when you favor throughput over the "command-2 only processed after * command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method. * - * An event `handler` may close over processor state and modify it. The `sender` of a persisted + * An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted * event is the sender of the corresponding command. This means that one can reply to a command * sender within an event `handler`. * - * If persistence of an event fails, the processor will be stopped. This can be customized by + * If persistence of an event fails, the persistent actor will be stopped. This can be customized by * handling [[PersistenceFailure]] in [[receiveCommand]]. * * @param event event to be persisted @@ -536,7 +534,7 @@ abstract class UntypedPersistentActor extends UntypedActor with ProcessorImpl wi * has been captured and saved, this handler will receive a [[SnapshotOffer]] message * followed by events that are younger than the offered snapshot. * - * This handler must not have side-effects other than changing processor state i.e. it + * This handler must not have side-effects other than changing persistent actor state i.e. it * should not perform actions that may fail, such as interacting with external services, * for example. * @@ -552,9 +550,9 @@ abstract class UntypedPersistentActor extends UntypedActor with ProcessorImpl wi * Java API: command handler. Typically validates commands against current state (and/or by * communication with other actors). On successful validation, one or more events are * derived from a command and these events are then persisted by calling `persist`. - * Commands sent to event sourced processors must not be [[Persistent]] or + * Commands sent to event sourced persistent actors must not be [[Persistent]] or * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is - * thrown by the processor. + * thrown by the persistent actor. */ @throws(classOf[Exception]) def onReceiveCommand(msg: Any): Unit @@ -567,21 +565,20 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo /** * Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the - * persisted event. It is guaranteed that no new commands will be received by a processor + * persisted event. It is guaranteed that no new commands will be received by a persistent actor * between a call to `persist` and the execution of its `handler`. This also holds for * multiple `persist` calls per received command. Internally, this is achieved by stashing new * commands and unstashing them when the `event` has been persisted and handled. The stash used - * for that is an internal stash which doesn't interfere with the user stash inherited from - * [[UntypedProcessor]]. + * for that is an internal stash which doesn't interfere with the inherited user stash. * - * An event `handler` may close over processor state and modify it. The `getSender()` of a persisted + * An event `handler` may close over persistent actor state and modify it. The `getSender()` of a persisted * event is the sender of the corresponding command. This means that one can reply to a command * sender within an event `handler`. * - * Within an event handler, applications usually update processor state using persisted event + * Within an event handler, applications usually update persistent actor state using persisted event * data, notify listeners and reply to command senders. * - * If persistence of an event fails, the processor will be stopped. This can be customized by + * If persistence of an event fails, the persistent actor will be stopped. This can be customized by * handling [[PersistenceFailure]] in [[receiveCommand]]. * * @param event event to be persisted. @@ -605,11 +602,11 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo * Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the * persisted event. * - * Unlike `persist` the processor will continue to receive incomming commands between the + * Unlike `persist` the persistent actor will continue to receive incomming commands between the * call to `persistAsync` and executing it's `handler`. This asynchronous, non-stashing, version of * of persist should be used when you favor throughput over the strict ordering guarantees that `persist` guarantees. * - * If persistence of an event fails, the processor will be stopped. This can be customized by + * If persistence of an event fails, the persistent actor will be stopped. This can be customized by * handling [[PersistenceFailure]] in [[receiveCommand]]. * * @param event event to be persisted diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 9753d957ae..7b8ac204bd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -11,7 +11,7 @@ import akka.actor._ /** * INTERNAL API. * - * Messages exchanged between processors, views, channels and a journal. + * Messages exchanged between persistent actors, views, channels and a journal. */ private[persistence] object JournalProtocol { /** @@ -56,9 +56,9 @@ private[persistence] object JournalProtocol { * Request to write messages. * * @param messages messages to be written. - * @param processor write requestor. + * @param persistentActor write requestor. */ - final case class WriteMessages(messages: immutable.Seq[Resequenceable], processor: ActorRef) + final case class WriteMessages(messages: immutable.Seq[Resequenceable], persistentActor: ActorRef) /** * Reply message to a successful [[WriteMessages]] request. This reply is sent to the requestor @@ -92,13 +92,13 @@ private[persistence] object JournalProtocol { final case class WriteMessageFailure(message: PersistentRepr, cause: Throwable) /** - * Request to loop a `message` back to `processor`, without persisting the message. Looping of messages + * Request to loop a `message` back to `persistent actor`, without persisting the message. Looping of messages * through a journal is required to preserve message order with persistent messages. * * @param message message to be looped through the journal. - * @param processor loop requestor. + * @param persistentActor loop requestor. */ - final case class LoopMessage(message: Any, processor: ActorRef) + final case class LoopMessage(message: Any, persistentActor: ActorRef) /** * Reply message to a [[LoopMessage]] request. @@ -108,16 +108,16 @@ private[persistence] object JournalProtocol { final case class LoopMessageSuccess(message: Any) /** - * Request to replay messages to `processor`. + * Request to replay messages to `persistentActor`. * * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param max maximum number of messages to be replayed. - * @param persistenceId requesting processor id. - * @param processor requesting processor. + * @param persistenceId requesting persistent actor id. + * @param persistentActor requesting persistent actor. * @param replayDeleted `true` if messages marked as deleted shall be replayed. */ - final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, persistenceId: String, processor: ActorRef, replayDeleted: Boolean = false) + final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, persistenceId: String, persistentActor: ActorRef, replayDeleted: Boolean = false) /** * Reply message to a [[ReplayMessages]] request. A separate reply is sent to the requestor for each @@ -140,13 +140,13 @@ private[persistence] object JournalProtocol { final case class ReplayMessagesFailure(cause: Throwable) /** - * Request to read the highest stored sequence number of a given processor. + * Request to read the highest stored sequence number of a given persistent actor. * * @param fromSequenceNr optional hint where to start searching for the maximum sequence number. - * @param persistenceId requesting processor id. - * @param processor requesting processor. + * @param persistenceId requesting persistent actor id. + * @param persistentActor requesting persistent actor. */ - final case class ReadHighestSequenceNr(fromSequenceNr: Long = 1L, persistenceId: String, processor: ActorRef) + final case class ReadHighestSequenceNr(fromSequenceNr: Long = 1L, persistenceId: String, persistentActor: ActorRef) /** * Reply message to a successful [[ReadHighestSequenceNr]] request. diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 91e3124dd5..ade35a8c85 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -15,17 +15,22 @@ import akka.pattern.PromiseActorRef import akka.persistence.serialization.Message /** + * INTERNAL API + * * Marks messages which can be resequenced by the [[akka.persistence.journal.AsyncWriteJournal]]. * * In essence it is either an [[NonPersistentRepr]] or [[Persistent]]. */ -sealed trait Resequenceable { +private[persistence] sealed trait Resequenceable { def payload: Any def sender: ActorRef } -/** Message which can be resequenced by the Journal, but will not be persisted. */ -final case class NonPersistentRepr(payload: Any, sender: ActorRef) extends Resequenceable +/** + * INTERNAL API + * Message which can be resequenced by the Journal, but will not be persisted. + */ +private[persistence] final case class NonPersistentRepr(payload: Any, sender: ActorRef) extends Resequenceable /** Persistent message. */ @deprecated("Use akka.persistence.PersistentActor instead.", since = "2.3.4") @@ -124,11 +129,22 @@ object ConfirmablePersistent { * journal. The processor receives the written messages individually as [[Persistent]] messages. * During recovery, they are also replayed individually. */ +@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") case class PersistentBatch(batch: immutable.Seq[Resequenceable]) extends Message +@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4") +object PersistentBatch { + /** + * Java API. + */ + def create(persistentBatch: JIterable[Persistent]) = + PersistentBatch(immutableSeq(persistentBatch)) +} + /** * Plugin API: confirmation entry written by journal plugins. */ +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") trait PersistentConfirmation { @deprecated("Use `persistenceId` instead. Processor will be removed.", since = "2.3.4") final def processorId: String = persistenceId @@ -197,33 +213,39 @@ trait PersistentRepr extends Persistent with Resequenceable with PersistenceId w * Number of redeliveries. Only greater than zero if message has been redelivered by a [[Channel]] * or [[PersistentChannel]]. */ + @deprecated("Channel will be removed.", since = "2.3.4") def redeliveries: Int /** * Channel ids of delivery confirmations that are available for this message. Only non-empty * for replayed messages. */ + @deprecated("Channel will be removed.", since = "2.3.4") def confirms: immutable.Seq[String] /** * Java API, Plugin API: channel ids of delivery confirmations that are available for this * message. Only non-empty for replayed messages. */ + @deprecated("Channel will be removed.", since = "2.3.4") def getConfirms: JList[String] = confirms.asJava /** * `true` only if this message has been delivered by a channel. */ + @deprecated("Channel will be removed.", since = "2.3.4") def confirmable: Boolean /** * Delivery confirmation message. */ + @deprecated("Channel will be removed.", since = "2.3.4") def confirmMessage: Delivered /** * Delivery confirmation message. */ + @deprecated("Channel will be removed.", since = "2.3.4") def confirmTarget: ActorRef /** @@ -249,10 +271,10 @@ trait PersistentRepr extends Persistent with Resequenceable with PersistenceId w sequenceNr: Long = sequenceNr, @deprecatedName('processorId) persistenceId: String = persistenceId, deleted: Boolean = deleted, - redeliveries: Int = redeliveries, - confirms: immutable.Seq[String] = confirms, - confirmMessage: Delivered = confirmMessage, - confirmTarget: ActorRef = confirmTarget, + @deprecated("Channel will be removed.", since = "2.3.4") redeliveries: Int = redeliveries, + @deprecated("Channel will be removed.", since = "2.3.4") confirms: immutable.Seq[String] = confirms, + @deprecated("Channel will be removed.", since = "2.3.4") confirmMessage: Delivered = confirmMessage, + @deprecated("Channel will be removed.", since = "2.3.4") confirmTarget: ActorRef = confirmTarget, sender: ActorRef = sender): PersistentRepr } @@ -270,11 +292,11 @@ object PersistentRepr { sequenceNr: Long = 0L, @deprecatedName('processorId) persistenceId: String = PersistentRepr.Undefined, deleted: Boolean = false, - redeliveries: Int = 0, - confirms: immutable.Seq[String] = Nil, - confirmable: Boolean = false, - confirmMessage: Delivered = null, - confirmTarget: ActorRef = null, + @deprecated("Channel will be removed.", since = "2.3.4") redeliveries: Int = 0, + @deprecated("Channel will be removed.", since = "2.3.4") confirms: immutable.Seq[String] = Nil, + @deprecated("Channel will be removed.", since = "2.3.4") confirmable: Boolean = false, + @deprecated("Channel will be removed.", since = "2.3.4") confirmMessage: Delivered = null, + @deprecated("Channel will be removed.", since = "2.3.4") confirmTarget: ActorRef = null, sender: ActorRef = null) = if (confirmable) ConfirmablePersistentImpl(payload, sequenceNr, persistenceId, deleted, redeliveries, confirms, confirmMessage, confirmTarget, sender) else PersistentImpl(payload, sequenceNr, persistenceId, deleted, confirms, sender) @@ -285,14 +307,6 @@ object PersistentRepr { def create = apply _ } -object PersistentBatch { - /** - * Java API. - */ - def create(persistentBatch: JIterable[Persistent]) = - PersistentBatch(immutableSeq(persistentBatch)) -} - /** * INTERNAL API. */ diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala index 1ad88fac48..a7afe009bc 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -48,7 +48,7 @@ object Update { /** * A view replicates the persistent message stream of a [[PersistentActor]]. Implementation classes receive * the message stream directly from the Journal. These messages can be processed to update internal state - * in order to maintain an (eventual consistent) view of the state of the corresponding processor. A + * in order to maintain an (eventual consistent) view of the state of the corresponding persistent actor. A * persistent view can also run on a different node, provided that a replicated journal is used. * * Implementation classes refer to a persistent actors' message stream by implementing `persistenceId` diff --git a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala index 2783cd1a62..c697b8594e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala @@ -14,8 +14,8 @@ import scala.util.control.NonFatal /** * Recovery state machine that loads snapshots and replays messages. * - * @see [[Processor]] - * @see [[View]] + * @see [[PersistentActor]] + * @see [[PersistentView]] */ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { /** @@ -187,11 +187,13 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { /** * Returns the current persistent message if there is any. */ + @deprecated("currentPersistentMessage will be removed, sequence number can be retrieved with `lastSequenceNr`.", since = "2.3.4") implicit def currentPersistentMessage: Option[Persistent] = Option(_currentPersistent) /** * Java API: returns the current persistent message or `null` if there is none. */ + @deprecated("getCurrentPersistentMessage will be removed, sequence number can be retrieved with `lastSequenceNr`.", since = "2.3.4") def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null) /** @@ -256,12 +258,12 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { } /** - * Instructs a processor to recover itself. Recovery will start from a snapshot if the processor has + * Instructs a persistent actor to recover itself. Recovery will start from a snapshot if the persistent actor has * previously saved one or more snapshots and at least one of these snapshots matches the specified * `fromSnapshot` criteria. Otherwise, recovery will start from scratch by replaying all journaled * messages. * - * If recovery starts from a snapshot, the processor is offered that snapshot with a [[SnapshotOffer]] + * If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]] * message, followed by replayed messages, if any, that are younger than the snapshot, up to the * specified upper sequence number bound (`toSequenceNr`). * diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala index 0b5ac1a547..7bf341effc 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala @@ -8,7 +8,7 @@ package akka.persistence /** * Snapshot metadata. * - * @param persistenceId id of processor from which the snapshot was taken. + * @param persistenceId id of persistent actor from which the snapshot was taken. * @param sequenceNr sequence number at which the snapshot was taken. * @param timestamp time at which the snapshot was saved. */ @@ -17,7 +17,7 @@ final case class SnapshotMetadata(@deprecatedName('processorId) persistenceId: S //#snapshot-metadata /** - * Sent to a [[Processor]] after successful saving of a snapshot. + * Sent to a [[PersistentActor]] after successful saving of a snapshot. * * @param metadata snapshot metadata. */ @@ -25,7 +25,7 @@ final case class SnapshotMetadata(@deprecatedName('processorId) persistenceId: S final case class SaveSnapshotSuccess(metadata: SnapshotMetadata) /** - * Sent to a [[Processor]] after failed saving of a snapshot. + * Sent to a [[PersistentActor]] after failed saving of a snapshot. * * @param metadata snapshot metadata. * @param cause failure cause. @@ -34,7 +34,7 @@ final case class SaveSnapshotSuccess(metadata: SnapshotMetadata) final case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable) /** - * Offers a [[Processor]] a previously saved `snapshot` during recovery. This offer is received + * Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received * before any further replayed messages. */ @SerialVersionUID(1L) @@ -110,13 +110,13 @@ object SelectedSnapshot { /** * INTERNAL API. * - * Defines messages exchanged between processors and a snapshot store. + * Defines messages exchanged between persistent actors and a snapshot store. */ private[persistence] object SnapshotProtocol { /** * Instructs a snapshot store to load a snapshot. * - * @param persistenceId processor id. + * @param persistenceId persistent actor id. * @param criteria criteria for selecting a snapshot from which recovery should start. * @param toSequenceNr upper sequence number bound (inclusive) for recovery. */ @@ -147,7 +147,7 @@ private[persistence] object SnapshotProtocol { /** * Instructs snapshot store to delete all snapshots that match `criteria`. * - * @param persistenceId processor id. + * @param persistenceId persistent actor id. * @param criteria criteria for selecting snapshots to be deleted. */ final case class DeleteSnapshots(@deprecatedName('processorId) persistenceId: String, criteria: SnapshotSelectionCriteria) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala index 9724894df2..e16c497ff0 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala @@ -27,7 +27,7 @@ trait AsyncRecovery { * The channel ids of delivery confirmations that are available for a replayed * message must be contained in that message's `confirms` sequence. * - * @param persistenceId processor id. + * @param persistenceId persistent actor id. * @param fromSequenceNr sequence number where replay should start (inclusive). * @param toSequenceNr sequence number where replay should end (inclusive). * @param max maximum number of messages to be replayed. @@ -43,7 +43,7 @@ trait AsyncRecovery { * Plugin API: asynchronously reads the highest stored sequence number for the * given `persistenceId`. * - * @param persistenceId processor id. + * @param persistenceId persistent actor id. * @param fromSequenceNr hint where to start searching for the highest sequence * number. */ diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 51c1e2bd31..8f378955bf 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -34,7 +34,7 @@ private[persistence] class InmemJournal extends AsyncWriteProxy { * INTERNAL API. */ private[persistence] trait InmemMessages { - // processor id -> persistent message + // persistentActorId -> persistent message var messages = Map.empty[String, Vector[PersistentRepr]] def add(p: PersistentRepr) = messages = messages + (messages.get(p.persistenceId) match { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala index c18abe99b9..b6fcf5ae49 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala @@ -11,7 +11,7 @@ import akka.actor.Actor /** * INTERNAL API. * - * LevelDB backed persistent mapping of `String`-based processor and channel ids to numeric ids. + * LevelDB backed persistent mapping of `String`-based persistent actor and channel ids to numeric ids. */ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore ⇒ import Key._ @@ -20,7 +20,7 @@ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore private var idMap: Map[String, Int] = Map.empty /** - * Get the mapped numeric id for the specified processor or channel `id`. Creates and + * Get the mapped numeric id for the specified persistent actor or channel `id`. Creates and * stores a new mapping if necessary. */ def numericId(id: String): Int = idMap.get(id) match { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index bc99a34324..533e500dc5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -36,10 +36,6 @@ private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with if (nativeLeveldb) org.fusesource.leveldbjni.JniDBFactory.factory else org.iq80.leveldb.impl.Iq80DBFactory.factory - // TODO: support migration of processor and channel ids - // needed if default processor and channel ids are used - // (actor paths, which contain deployment information). - val serialization = SerializationExtension(context.system) import Key._ diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index 3203fc0068..c60eabbd7c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -39,15 +39,15 @@ trait SnapshotStore extends Actor { } to (self, p) case evt @ SaveSnapshotSuccess(metadata) ⇒ saved(metadata) - sender() ! evt // sender is processor + sender() ! evt // sender is persistentActor case evt @ SaveSnapshotFailure(metadata, _) ⇒ delete(metadata) - sender() ! evt // sender is processor + sender() ! evt // sender is persistentActor case d @ DeleteSnapshot(metadata) ⇒ delete(metadata) if (publish) context.system.eventStream.publish(d) - case d @ DeleteSnapshots(processorId, criteria) ⇒ - delete(processorId, criteria) + case d @ DeleteSnapshots(persistenceId, criteria) ⇒ + delete(persistenceId, criteria) if (publish) context.system.eventStream.publish(d) } @@ -55,7 +55,7 @@ trait SnapshotStore extends Actor { /** * Plugin API: asynchronously loads a snapshot. * - * @param processorId processor id. + * @param persistenceId processor id. * @param criteria selection criteria for loading. */ def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala index 71f8910c5a..5885e25aa0 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala @@ -41,7 +41,7 @@ object AtLeastOnceDeliverySpec { destinations: Map[String, ActorPath]) extends PersistentActor with AtLeastOnceDelivery with ActorLogging { - override def processorId: String = name + override def persistenceId: String = name def updateState(evt: Evt): Unit = evt match { case AcceptedReq(payload, destination) ⇒