!per Additional deprecations and cleanup

(cherry picked from commit 98619647e372121fb3c2072a0eab20de4148f7d9)

Conflicts:
	akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala
	akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala
This commit is contained in:
Patrik Nordwall 2014-06-27 09:44:21 +02:00
parent 33027999da
commit f6aa491ef0
13 changed files with 104 additions and 98 deletions

View file

@ -27,9 +27,6 @@ akka {
journal {
# Maximum size of a persistent message batch written to the journal.
# Only applies to internally created batches by processors that receive
# persistent messages individually. Application-defined batches, even if
# larger than this setting, are always written as a single isolated batch.
max-message-batch-size = 200
# Maximum size of a confirmation batch written to the journal.

View file

@ -164,14 +164,14 @@ private[persistence] trait Eventsourced extends ProcessorImpl {
receiveRecover(RecoveryCompleted)
}
sealed trait PendingHandlerInvocation {
private sealed trait PendingHandlerInvocation {
def evt: Any
def handler: Any Unit
}
/** forces processor to stash incoming commands untill all these invocations are handled */
final case class StashingHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
private final case class StashingHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
/** does not force the processor to stash commands; Originates from either `persistAsync` or `defer` calls */
final case class AsyncHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
private final case class AsyncHandlerInvocation(evt: Any, handler: Any Unit) extends PendingHandlerInvocation
/** Used instead of iterating `pendingInvocations` in order to check if safe to revert to processing commands */
private var pendingStashingPersistInvocations: Long = 0
@ -199,21 +199,20 @@ private[persistence] trait Eventsourced extends ProcessorImpl {
/**
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a processor
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn't interfere with the user stash inherited from
* [[Processor]].
* for that is an internal stash which doesn't interfere with the inherited user stash.
*
* An event `handler` may close over processor state and modify it. The `sender` of a persisted
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update processor state using persisted event
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted
@ -241,16 +240,16 @@ private[persistence] trait Eventsourced extends ProcessorImpl {
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the processor will continue to receive incomming commands between the
* Unlike `persist` the persistent actor will continue to receive incomming commands between the
* call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the "command-2 only processed after
* command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method.
*
* An event `handler` may close over processor state and modify it. The `sender` of a persisted
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted
@ -327,7 +326,7 @@ private[persistence] trait Eventsourced extends ProcessorImpl {
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
* followed by events that are younger than the offered snapshot.
*
* This handler must not have side-effects other than changing processor state i.e. it
* This handler must not have side-effects other than changing persistent actor state i.e. it
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
@ -342,7 +341,7 @@ private[persistence] trait Eventsourced extends ProcessorImpl {
* Command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors should not be [[Persistent]] messages.
* Commands sent to event sourced persistent actors should not be [[Persistent]] messages.
*/
def receiveCommand: Receive
@ -420,21 +419,20 @@ abstract class UntypedPersistentActor extends UntypedActor with ProcessorImpl wi
/**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a processor
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn't interfere with the user stash inherited from
* [[UntypedProcessor]].
* for that is an internal stash which doesn't interfere with the inherited user stash.
*
* An event `handler` may close over processor state and modify it. The `getSender()` of a persisted
* An event `handler` may close over persistent actor state and modify it. The `getSender()` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update processor state using persisted event
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[onReceiveCommand]].
*
* @param event event to be persisted.
@ -458,16 +456,16 @@ abstract class UntypedPersistentActor extends UntypedActor with ProcessorImpl wi
* JAVA API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the processor will continue to receive incomming commands between the
* Unlike `persist` the persistent actor will continue to receive incomming commands between the
* call to `persist` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the "command-2 only processed after
* command-1 effects' have been applied" guarantee, which is provided by the plain [[persist]] method.
*
* An event `handler` may close over processor state and modify it. The `sender` of a persisted
* An event `handler` may close over persistent actor state and modify it. The `sender` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted
@ -536,7 +534,7 @@ abstract class UntypedPersistentActor extends UntypedActor with ProcessorImpl wi
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message
* followed by events that are younger than the offered snapshot.
*
* This handler must not have side-effects other than changing processor state i.e. it
* This handler must not have side-effects other than changing persistent actor state i.e. it
* should not perform actions that may fail, such as interacting with external services,
* for example.
*
@ -552,9 +550,9 @@ abstract class UntypedPersistentActor extends UntypedActor with ProcessorImpl wi
* Java API: command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors must not be [[Persistent]] or
* Commands sent to event sourced persistent actors must not be [[Persistent]] or
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the processor.
* thrown by the persistent actor.
*/
@throws(classOf[Exception])
def onReceiveCommand(msg: Any): Unit
@ -567,21 +565,20 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
/**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a processor
* persisted event. It is guaranteed that no new commands will be received by a persistent actor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn't interfere with the user stash inherited from
* [[UntypedProcessor]].
* for that is an internal stash which doesn't interfere with the inherited user stash.
*
* An event `handler` may close over processor state and modify it. The `getSender()` of a persisted
* An event `handler` may close over persistent actor state and modify it. The `getSender()` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update processor state using persisted event
* Within an event handler, applications usually update persistent actor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted.
@ -605,11 +602,11 @@ abstract class AbstractPersistentActor extends AbstractActor with PersistentActo
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event.
*
* Unlike `persist` the processor will continue to receive incomming commands between the
* Unlike `persist` the persistent actor will continue to receive incomming commands between the
* call to `persistAsync` and executing it's `handler`. This asynchronous, non-stashing, version of
* of persist should be used when you favor throughput over the strict ordering guarantees that `persist` guarantees.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* If persistence of an event fails, the persistent actor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted

View file

@ -11,7 +11,7 @@ import akka.actor._
/**
* INTERNAL API.
*
* Messages exchanged between processors, views, channels and a journal.
* Messages exchanged between persistent actors, views, channels and a journal.
*/
private[persistence] object JournalProtocol {
/**
@ -56,9 +56,9 @@ private[persistence] object JournalProtocol {
* Request to write messages.
*
* @param messages messages to be written.
* @param processor write requestor.
* @param persistentActor write requestor.
*/
final case class WriteMessages(messages: immutable.Seq[Resequenceable], processor: ActorRef)
final case class WriteMessages(messages: immutable.Seq[Resequenceable], persistentActor: ActorRef)
/**
* Reply message to a successful [[WriteMessages]] request. This reply is sent to the requestor
@ -92,13 +92,13 @@ private[persistence] object JournalProtocol {
final case class WriteMessageFailure(message: PersistentRepr, cause: Throwable)
/**
* Request to loop a `message` back to `processor`, without persisting the message. Looping of messages
* Request to loop a `message` back to `persistent actor`, without persisting the message. Looping of messages
* through a journal is required to preserve message order with persistent messages.
*
* @param message message to be looped through the journal.
* @param processor loop requestor.
* @param persistentActor loop requestor.
*/
final case class LoopMessage(message: Any, processor: ActorRef)
final case class LoopMessage(message: Any, persistentActor: ActorRef)
/**
* Reply message to a [[LoopMessage]] request.
@ -108,16 +108,16 @@ private[persistence] object JournalProtocol {
final case class LoopMessageSuccess(message: Any)
/**
* Request to replay messages to `processor`.
* Request to replay messages to `persistentActor`.
*
* @param fromSequenceNr sequence number where replay should start (inclusive).
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param max maximum number of messages to be replayed.
* @param persistenceId requesting processor id.
* @param processor requesting processor.
* @param persistenceId requesting persistent actor id.
* @param persistentActor requesting persistent actor.
* @param replayDeleted `true` if messages marked as deleted shall be replayed.
*/
final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, persistenceId: String, processor: ActorRef, replayDeleted: Boolean = false)
final case class ReplayMessages(fromSequenceNr: Long, toSequenceNr: Long, max: Long, persistenceId: String, persistentActor: ActorRef, replayDeleted: Boolean = false)
/**
* Reply message to a [[ReplayMessages]] request. A separate reply is sent to the requestor for each
@ -140,13 +140,13 @@ private[persistence] object JournalProtocol {
final case class ReplayMessagesFailure(cause: Throwable)
/**
* Request to read the highest stored sequence number of a given processor.
* Request to read the highest stored sequence number of a given persistent actor.
*
* @param fromSequenceNr optional hint where to start searching for the maximum sequence number.
* @param persistenceId requesting processor id.
* @param processor requesting processor.
* @param persistenceId requesting persistent actor id.
* @param persistentActor requesting persistent actor.
*/
final case class ReadHighestSequenceNr(fromSequenceNr: Long = 1L, persistenceId: String, processor: ActorRef)
final case class ReadHighestSequenceNr(fromSequenceNr: Long = 1L, persistenceId: String, persistentActor: ActorRef)
/**
* Reply message to a successful [[ReadHighestSequenceNr]] request.

View file

@ -15,17 +15,22 @@ import akka.pattern.PromiseActorRef
import akka.persistence.serialization.Message
/**
* INTERNAL API
*
* Marks messages which can be resequenced by the [[akka.persistence.journal.AsyncWriteJournal]].
*
* In essence it is either an [[NonPersistentRepr]] or [[Persistent]].
*/
sealed trait Resequenceable {
private[persistence] sealed trait Resequenceable {
def payload: Any
def sender: ActorRef
}
/** Message which can be resequenced by the Journal, but will not be persisted. */
final case class NonPersistentRepr(payload: Any, sender: ActorRef) extends Resequenceable
/**
* INTERNAL API
* Message which can be resequenced by the Journal, but will not be persisted.
*/
private[persistence] final case class NonPersistentRepr(payload: Any, sender: ActorRef) extends Resequenceable
/** Persistent message. */
@deprecated("Use akka.persistence.PersistentActor instead.", since = "2.3.4")
@ -124,11 +129,22 @@ object ConfirmablePersistent {
* journal. The processor receives the written messages individually as [[Persistent]] messages.
* During recovery, they are also replayed individually.
*/
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
case class PersistentBatch(batch: immutable.Seq[Resequenceable]) extends Message
@deprecated("Use akka.persistence.PersistentActor instead", since = "2.3.4")
object PersistentBatch {
/**
* Java API.
*/
def create(persistentBatch: JIterable[Persistent]) =
PersistentBatch(immutableSeq(persistentBatch))
}
/**
* Plugin API: confirmation entry written by journal plugins.
*/
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
trait PersistentConfirmation {
@deprecated("Use `persistenceId` instead. Processor will be removed.", since = "2.3.4")
final def processorId: String = persistenceId
@ -197,33 +213,39 @@ trait PersistentRepr extends Persistent with Resequenceable with PersistenceId w
* Number of redeliveries. Only greater than zero if message has been redelivered by a [[Channel]]
* or [[PersistentChannel]].
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def redeliveries: Int
/**
* Channel ids of delivery confirmations that are available for this message. Only non-empty
* for replayed messages.
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def confirms: immutable.Seq[String]
/**
* Java API, Plugin API: channel ids of delivery confirmations that are available for this
* message. Only non-empty for replayed messages.
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def getConfirms: JList[String] = confirms.asJava
/**
* `true` only if this message has been delivered by a channel.
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def confirmable: Boolean
/**
* Delivery confirmation message.
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def confirmMessage: Delivered
/**
* Delivery confirmation message.
*/
@deprecated("Channel will be removed.", since = "2.3.4")
def confirmTarget: ActorRef
/**
@ -249,10 +271,10 @@ trait PersistentRepr extends Persistent with Resequenceable with PersistenceId w
sequenceNr: Long = sequenceNr,
@deprecatedName('processorId) persistenceId: String = persistenceId,
deleted: Boolean = deleted,
redeliveries: Int = redeliveries,
confirms: immutable.Seq[String] = confirms,
confirmMessage: Delivered = confirmMessage,
confirmTarget: ActorRef = confirmTarget,
@deprecated("Channel will be removed.", since = "2.3.4") redeliveries: Int = redeliveries,
@deprecated("Channel will be removed.", since = "2.3.4") confirms: immutable.Seq[String] = confirms,
@deprecated("Channel will be removed.", since = "2.3.4") confirmMessage: Delivered = confirmMessage,
@deprecated("Channel will be removed.", since = "2.3.4") confirmTarget: ActorRef = confirmTarget,
sender: ActorRef = sender): PersistentRepr
}
@ -270,11 +292,11 @@ object PersistentRepr {
sequenceNr: Long = 0L,
@deprecatedName('processorId) persistenceId: String = PersistentRepr.Undefined,
deleted: Boolean = false,
redeliveries: Int = 0,
confirms: immutable.Seq[String] = Nil,
confirmable: Boolean = false,
confirmMessage: Delivered = null,
confirmTarget: ActorRef = null,
@deprecated("Channel will be removed.", since = "2.3.4") redeliveries: Int = 0,
@deprecated("Channel will be removed.", since = "2.3.4") confirms: immutable.Seq[String] = Nil,
@deprecated("Channel will be removed.", since = "2.3.4") confirmable: Boolean = false,
@deprecated("Channel will be removed.", since = "2.3.4") confirmMessage: Delivered = null,
@deprecated("Channel will be removed.", since = "2.3.4") confirmTarget: ActorRef = null,
sender: ActorRef = null) =
if (confirmable) ConfirmablePersistentImpl(payload, sequenceNr, persistenceId, deleted, redeliveries, confirms, confirmMessage, confirmTarget, sender)
else PersistentImpl(payload, sequenceNr, persistenceId, deleted, confirms, sender)
@ -285,14 +307,6 @@ object PersistentRepr {
def create = apply _
}
object PersistentBatch {
/**
* Java API.
*/
def create(persistentBatch: JIterable[Persistent]) =
PersistentBatch(immutableSeq(persistentBatch))
}
/**
* INTERNAL API.
*/

View file

@ -48,7 +48,7 @@ object Update {
/**
* A view replicates the persistent message stream of a [[PersistentActor]]. Implementation classes receive
* the message stream directly from the Journal. These messages can be processed to update internal state
* in order to maintain an (eventual consistent) view of the state of the corresponding processor. A
* in order to maintain an (eventual consistent) view of the state of the corresponding persistent actor. A
* persistent view can also run on a different node, provided that a replicated journal is used.
*
* Implementation classes refer to a persistent actors' message stream by implementing `persistenceId`

View file

@ -14,8 +14,8 @@ import scala.util.control.NonFatal
/**
* Recovery state machine that loads snapshots and replays messages.
*
* @see [[Processor]]
* @see [[View]]
* @see [[PersistentActor]]
* @see [[PersistentView]]
*/
trait Recovery extends Actor with Snapshotter with Stash with StashFactory {
/**
@ -187,11 +187,13 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory {
/**
* Returns the current persistent message if there is any.
*/
@deprecated("currentPersistentMessage will be removed, sequence number can be retrieved with `lastSequenceNr`.", since = "2.3.4")
implicit def currentPersistentMessage: Option[Persistent] = Option(_currentPersistent)
/**
* Java API: returns the current persistent message or `null` if there is none.
*/
@deprecated("getCurrentPersistentMessage will be removed, sequence number can be retrieved with `lastSequenceNr`.", since = "2.3.4")
def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null)
/**
@ -256,12 +258,12 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory {
}
/**
* Instructs a processor to recover itself. Recovery will start from a snapshot if the processor has
* Instructs a persistent actor to recover itself. Recovery will start from a snapshot if the persistent actor has
* previously saved one or more snapshots and at least one of these snapshots matches the specified
* `fromSnapshot` criteria. Otherwise, recovery will start from scratch by replaying all journaled
* messages.
*
* If recovery starts from a snapshot, the processor is offered that snapshot with a [[SnapshotOffer]]
* If recovery starts from a snapshot, the persistent actor is offered that snapshot with a [[SnapshotOffer]]
* message, followed by replayed messages, if any, that are younger than the snapshot, up to the
* specified upper sequence number bound (`toSequenceNr`).
*

View file

@ -8,7 +8,7 @@ package akka.persistence
/**
* Snapshot metadata.
*
* @param persistenceId id of processor from which the snapshot was taken.
* @param persistenceId id of persistent actor from which the snapshot was taken.
* @param sequenceNr sequence number at which the snapshot was taken.
* @param timestamp time at which the snapshot was saved.
*/
@ -17,7 +17,7 @@ final case class SnapshotMetadata(@deprecatedName('processorId) persistenceId: S
//#snapshot-metadata
/**
* Sent to a [[Processor]] after successful saving of a snapshot.
* Sent to a [[PersistentActor]] after successful saving of a snapshot.
*
* @param metadata snapshot metadata.
*/
@ -25,7 +25,7 @@ final case class SnapshotMetadata(@deprecatedName('processorId) persistenceId: S
final case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
/**
* Sent to a [[Processor]] after failed saving of a snapshot.
* Sent to a [[PersistentActor]] after failed saving of a snapshot.
*
* @param metadata snapshot metadata.
* @param cause failure cause.
@ -34,7 +34,7 @@ final case class SaveSnapshotSuccess(metadata: SnapshotMetadata)
final case class SaveSnapshotFailure(metadata: SnapshotMetadata, cause: Throwable)
/**
* Offers a [[Processor]] a previously saved `snapshot` during recovery. This offer is received
* Offers a [[PersistentActor]] a previously saved `snapshot` during recovery. This offer is received
* before any further replayed messages.
*/
@SerialVersionUID(1L)
@ -110,13 +110,13 @@ object SelectedSnapshot {
/**
* INTERNAL API.
*
* Defines messages exchanged between processors and a snapshot store.
* Defines messages exchanged between persistent actors and a snapshot store.
*/
private[persistence] object SnapshotProtocol {
/**
* Instructs a snapshot store to load a snapshot.
*
* @param persistenceId processor id.
* @param persistenceId persistent actor id.
* @param criteria criteria for selecting a snapshot from which recovery should start.
* @param toSequenceNr upper sequence number bound (inclusive) for recovery.
*/
@ -147,7 +147,7 @@ private[persistence] object SnapshotProtocol {
/**
* Instructs snapshot store to delete all snapshots that match `criteria`.
*
* @param persistenceId processor id.
* @param persistenceId persistent actor id.
* @param criteria criteria for selecting snapshots to be deleted.
*/
final case class DeleteSnapshots(@deprecatedName('processorId) persistenceId: String, criteria: SnapshotSelectionCriteria)

View file

@ -27,7 +27,7 @@ trait AsyncRecovery {
* The channel ids of delivery confirmations that are available for a replayed
* message must be contained in that message's `confirms` sequence.
*
* @param persistenceId processor id.
* @param persistenceId persistent actor id.
* @param fromSequenceNr sequence number where replay should start (inclusive).
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param max maximum number of messages to be replayed.
@ -43,7 +43,7 @@ trait AsyncRecovery {
* Plugin API: asynchronously reads the highest stored sequence number for the
* given `persistenceId`.
*
* @param persistenceId processor id.
* @param persistenceId persistent actor id.
* @param fromSequenceNr hint where to start searching for the highest sequence
* number.
*/

View file

@ -34,7 +34,7 @@ private[persistence] class InmemJournal extends AsyncWriteProxy {
* INTERNAL API.
*/
private[persistence] trait InmemMessages {
// processor id -> persistent message
// persistentActorId -> persistent message
var messages = Map.empty[String, Vector[PersistentRepr]]
def add(p: PersistentRepr) = messages = messages + (messages.get(p.persistenceId) match {

View file

@ -11,7 +11,7 @@ import akka.actor.Actor
/**
* INTERNAL API.
*
* LevelDB backed persistent mapping of `String`-based processor and channel ids to numeric ids.
* LevelDB backed persistent mapping of `String`-based persistent actor and channel ids to numeric ids.
*/
private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore
import Key._
@ -20,7 +20,7 @@ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore
private var idMap: Map[String, Int] = Map.empty
/**
* Get the mapped numeric id for the specified processor or channel `id`. Creates and
* Get the mapped numeric id for the specified persistent actor or channel `id`. Creates and
* stores a new mapping if necessary.
*/
def numericId(id: String): Int = idMap.get(id) match {

View file

@ -36,10 +36,6 @@ private[persistence] trait LeveldbStore extends Actor with LeveldbIdMapping with
if (nativeLeveldb) org.fusesource.leveldbjni.JniDBFactory.factory
else org.iq80.leveldb.impl.Iq80DBFactory.factory
// TODO: support migration of processor and channel ids
// needed if default processor and channel ids are used
// (actor paths, which contain deployment information).
val serialization = SerializationExtension(context.system)
import Key._

View file

@ -39,15 +39,15 @@ trait SnapshotStore extends Actor {
} to (self, p)
case evt @ SaveSnapshotSuccess(metadata)
saved(metadata)
sender() ! evt // sender is processor
sender() ! evt // sender is persistentActor
case evt @ SaveSnapshotFailure(metadata, _)
delete(metadata)
sender() ! evt // sender is processor
sender() ! evt // sender is persistentActor
case d @ DeleteSnapshot(metadata)
delete(metadata)
if (publish) context.system.eventStream.publish(d)
case d @ DeleteSnapshots(processorId, criteria)
delete(processorId, criteria)
case d @ DeleteSnapshots(persistenceId, criteria)
delete(persistenceId, criteria)
if (publish) context.system.eventStream.publish(d)
}
@ -55,7 +55,7 @@ trait SnapshotStore extends Actor {
/**
* Plugin API: asynchronously loads a snapshot.
*
* @param processorId processor id.
* @param persistenceId processor id.
* @param criteria selection criteria for loading.
*/
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]

View file

@ -41,7 +41,7 @@ object AtLeastOnceDeliverySpec {
destinations: Map[String, ActorPath])
extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
override def processorId: String = name
override def persistenceId: String = name
def updateState(evt: Evt): Unit = evt match {
case AcceptedReq(payload, destination)