parent
d9f4933114
commit
36259079aa
2 changed files with 76 additions and 74 deletions
|
|
@ -36,13 +36,13 @@ Akka Persistence also provides point-to-point communication with at-least-once m
|
|||
|
||||
### Architecture
|
||||
|
||||
* @scala[`PersistentActor`]@java[`AbstractPersistentActor`]: Is a persistent, stateful actor. It is able to persist events to a journal and can react to
|
||||
* @scala[@scaladoc[PersistentActor](akka.persistence.PersistentActor)]@java[@javadoc[AbstractPersistentActor](akka.persistence.AbstractPersistentActor)]: Is a persistent, stateful actor. It is able to persist events to a journal and can react to
|
||||
them in a thread-safe manner. It can be used to implement both *command* as well as *event sourced* actors.
|
||||
When a persistent actor is started or restarted, journaled messages are replayed to that actor so that it can
|
||||
recover its state from these messages.
|
||||
* @scala[`AtLeastOnceDelivery`]@java[`AbstractPersistentActorAtLeastOnceDelivery`]: To send messages with at-least-once delivery semantics to destinations, also in
|
||||
* @scala[@scaladoc[AtLeastOnceDelivery](akka.persistence.AtLeastOnceDelivery)]@java[@javadoc[AbstractPersistentActorWithAtLeastOnceDelivery](akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery)]: To send messages with at-least-once delivery semantics to destinations, also in
|
||||
case of sender and receiver JVM crashes.
|
||||
* `AsyncWriteJournal`: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages
|
||||
* @scala[@scaladoc[AsyncWriteJournal](akka.persistence.journal.AsyncWriteJournal)]@java[@javadoc[AsyncWriteJournal](akka.persistence.journal.japi.AsyncWriteJournal)]: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages
|
||||
are journaled and which are received by the persistent actor without being journaled. Journal maintains `highestSequenceNr` that is increased on each message.
|
||||
The storage backend of a journal is pluggable.
|
||||
Replicated journals are available as [Community plugins](https://akka.io/community/).
|
||||
|
|
@ -54,9 +54,9 @@ development of event sourced applications (see section @ref:[Event Sourcing](typ
|
|||
|
||||
## Example
|
||||
|
||||
Akka persistence supports Event Sourcing with the @scala[`PersistentActor` trait]@java[`AbstractPersistentActor` abstract class]. An actor that extends this @scala[trait]@java[class] uses the
|
||||
`persist` method to persist and handle events. The behavior of @scala[a `PersistentActor`]@java[an `AbstractPersistentActor`]
|
||||
is defined by implementing @scala[`receiveRecover`]@java[`createReceiveRecover`] and @scala[`receiveCommand`]@java[`createReceive`]. This is demonstrated in the following example.
|
||||
Akka persistence supports Event Sourcing with the @scala[@scaladoc[PersistentActor](akka.persistence.PersistentActor) trait]@java[@javadoc[AbstractPersistentActor](akka.persistence.AbstractPersistentActor) abstract class]. An actor that extends this @scala[trait]@java[class] uses the
|
||||
@scala[@scaladoc[persist](akka.persistence.PersistentActor#persist[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[persist](akka.persistence.AbstractPersistentActorLike#persist(A,akka.japi.Procedure))] method to persist and handle events. The behavior of @scala[a `PersistentActor`]@java[an `AbstractPersistentActor`]
|
||||
is defined by implementing @scala[@scaladoc[receiveRecover](akka.persistence.PersistentActor#receiveRecover:Eventsourced.this.Receive)]@java[@javadoc[createReceiveRecover](akka.persistence.AbstractPersistentActorLike#createReceiveRecover())] and @scala[@scaladoc[receiveCommand](akka.persistence.PersistentActor#receiveCommand:Eventsourced.this.Receive)]@java[@javadoc[createReceive](akka.persistence.AbstractPersistentActorLike#createReceive())]. This is demonstrated in the following example.
|
||||
|
||||
Scala
|
||||
: @@snip [PersistentActorExample.scala](/akka-docs/src/test/scala/docs/persistence/PersistentActorExample.scala) { #persistent-actor-example }
|
||||
|
|
@ -94,9 +94,9 @@ by default) and the actor continues with the next message.
|
|||
@@@ note
|
||||
|
||||
It's also possible to switch between different command handlers during normal processing and recovery
|
||||
with @scala[`context.become()`]@java[`getContext().become()`] and @scala[`context.unbecome()`]@java[`getContext().unbecome()`]. To get the actor into the same state after
|
||||
with @scala[@scaladoc[context.become()](akka.actor.ActorContext#become(behavior:akka.actor.Actor.Receive):Unit)]@java[@javadoc[getContext().become()](akka.actor.ActorContext#become(scala.PartialFunction))] and @scala[@scaladoc[context.unbecome()](akka.actor.ActorContext#unbecome():Unit)]@java[@javadoc[getContext().unbecome()](akka.actor.ActorContext#unbecome())]. To get the actor into the same state after
|
||||
recovery you need to take special care to perform the same state transitions with `become` and
|
||||
`unbecome` in the @scala[`receiveRecover`]@java[`createReceiveRecover`] method as you would have done in the command handler.
|
||||
`unbecome` in the @scala[@scaladoc[receiveRecover](akka.persistence.PersistentActor#receiveRecover:Eventsourced.this.Receive)]@java[@javadoc[createReceiveRecover](akka.persistence.AbstractPersistentActorLike#createReceiveRecover())] method as you would have done in the command handler.
|
||||
Note that when using `become` from @scala[`receiveRecover`]@java[`createReceiveRecover`] it will still only use the @scala[`receiveRecover`]@java[`createReceiveRecover`]
|
||||
behavior when replaying the events. When replay is completed it will use the new behavior.
|
||||
|
||||
|
|
@ -141,17 +141,17 @@ akka.persistence.max-concurrent-recoveries = 50
|
|||
|
||||
Accessing the @scala[`sender()`]@java[sender with `getSender()`] for replayed messages will always result in a `deadLetters` reference,
|
||||
as the original sender is presumed to be long gone. If you indeed have to notify an actor during
|
||||
recovery in the future, store its `ActorPath` explicitly in your persisted events.
|
||||
recovery in the future, store its @apidoc[akka.actor.ActorPath] explicitly in your persisted events.
|
||||
|
||||
@@@
|
||||
|
||||
<a id="recovery-custom"></a>
|
||||
#### Recovery customization
|
||||
|
||||
Applications may also customise how recovery is performed by returning a customised `Recovery` object
|
||||
in the `recovery` method of a @scala[`PersistentActor`]@java[`AbstractPersistentActor`],
|
||||
Applications may also customise how recovery is performed by returning a customised @apidoc[akka.persistence.Recovery$] object
|
||||
in the `recovery` method of a @scala[@scaladoc[PersistentActor](akka.persistence.PersistentActor)]@java[@javadoc[AbstractPersistentActor](akka.persistence.AbstractPersistentActor)],
|
||||
|
||||
To skip loading snapshots and replay all events you can use @scala[`SnapshotSelectionCriteria.None`]@java[`SnapshotSelectionCriteria.none()`].
|
||||
To skip loading snapshots and replay all events you can use @scala[@scaladoc[SnapshotSelectionCriteria.None](akka.persistence.SnapshotSelectionCriteria$#None:akka.persistence.SnapshotSelectionCriteria)]@java[@javadoc[SnapshotSelectionCriteria.none()](akka.persistence.SnapshotSelectionCriteria#none())].
|
||||
This can be useful if snapshot serialization format has changed in an incompatible way.
|
||||
It should typically not be used when events have been deleted.
|
||||
|
||||
|
|
@ -192,7 +192,7 @@ Java
|
|||
|
||||
Sometimes there is a need for performing additional initialization when the
|
||||
recovery has completed before processing any other message sent to the persistent actor.
|
||||
The persistent actor will receive a special `RecoveryCompleted` message right after recovery
|
||||
The persistent actor will receive a special @apidoc[akka.persistence.RecoveryCompleted$] message right after recovery
|
||||
and before any other received messages.
|
||||
|
||||
Scala
|
||||
|
|
@ -211,12 +211,12 @@ is called (logging the error by default) and the actor will be stopped.
|
|||
### Internal stash
|
||||
|
||||
The persistent actor has a private @ref:[stash](actors.md#stash) for internally caching incoming messages during
|
||||
[recovery](#recovery) or the `persist\persistAll` method persisting events. You can still use/inherit from the
|
||||
`Stash` interface. The internal stash cooperates with the normal stash by hooking into `unstashAll` method and
|
||||
[recovery](#recovery) or the @scala[@scaladoc[persist](akka.persistence.PersistentActor#persist[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[persist](akka.persistence.AbstractPersistentActorLike#persist(A,akka.japi.Procedure))]\@scala[@scaladoc[persistAll](akka.persistence.PersistentActor#persistAll[A](events:Seq[A])(handler:A=%3EUnit):Unit)]@java[@javadoc[persistAll](akka.persistence.AbstractPersistentActorLike#persistAll(java.lang.Iterable,akka.japi.Procedure))] method persisting events. You can still use/inherit from the
|
||||
@apidoc[akka.actor.Stash] interface. The internal stash cooperates with the normal stash by hooking into @apidoc[unstashAll](akka.actor.Stash) {scala="#unstashAll():Unit" java="#unstashAll()"}
|
||||
making sure messages are unstashed properly to the internal stash to maintain ordering guarantees.
|
||||
|
||||
You should be careful to not send more messages to a persistent actor than it can keep up with, otherwise the number
|
||||
of stashed messages will grow without bounds. It can be wise to protect against `OutOfMemoryError` by defining a
|
||||
of stashed messages will grow without bounds. It can be wise to protect against @javadoc[OutOfMemoryError](java.lang.OutOfMemoryError) by defining a
|
||||
maximum stash capacity in the mailbox configuration:
|
||||
|
||||
```
|
||||
|
|
@ -226,11 +226,11 @@ akka.actor.default-mailbox.stash-capacity=10000
|
|||
Note that the stash capacity is per actor. If you have many persistent actors, e.g. when using cluster sharding,
|
||||
you may need to define a small stash capacity to ensure that the total number of stashed messages in the system
|
||||
doesn't consume too much memory. Additionally, the persistent actor defines three strategies to handle failure when the
|
||||
internal stash capacity is exceeded. The default overflow strategy is the `ThrowOverflowExceptionStrategy`, which
|
||||
discards the current received message and throws a `StashOverflowException`, causing actor restart if the default
|
||||
supervision strategy is used. You can override the `internalStashOverflowStrategy` method to return
|
||||
`DiscardToDeadLetterStrategy` or `ReplyToStrategy` for any "individual" persistent actor, or define the "default"
|
||||
for all persistent actors by providing FQCN, which must be a subclass of `StashOverflowStrategyConfigurator`, in the
|
||||
internal stash capacity is exceeded. The default overflow strategy is the @apidoc[ThrowOverflowExceptionStrategy$], which
|
||||
discards the current received message and throws a @apidoc[akka.actor.StashOverflowException], causing actor restart if the default
|
||||
supervision strategy is used. You can override the @apidoc[internalStashOverflowStrategy](akka.persistence.PersistenceStash) {scala="#internalStashOverflowStrategy:akka.persistence.StashOverflowStrategy" java="#internalStashOverflowStrategy()"} method to return
|
||||
@apidoc[DiscardToDeadLetterStrategy$] or @apidoc[ReplyToStrategy] for any "individual" persistent actor, or define the "default"
|
||||
for all persistent actors by providing FQCN, which must be a subclass of @apidoc[StashOverflowStrategyConfigurator], in the
|
||||
persistence configuration:
|
||||
|
||||
```
|
||||
|
|
@ -239,7 +239,7 @@ akka.persistence.internal-stash-overflow-strategy=
|
|||
```
|
||||
|
||||
The `DiscardToDeadLetterStrategy` strategy also has a pre-packaged companion configurator
|
||||
`akka.persistence.DiscardConfigurator`.
|
||||
@apidoc[akka.persistence.DiscardConfigurator].
|
||||
|
||||
You can also query the default strategy via the Akka persistence extension singleton:
|
||||
|
||||
|
|
@ -267,14 +267,14 @@ be discarded. You can use bounded stash instead of it.
|
|||
<a id="persist-async"></a>
|
||||
### Relaxed local consistency requirements and high throughput use-cases
|
||||
|
||||
If faced with relaxed local consistency requirements and high throughput demands sometimes `PersistentActor` and its
|
||||
`persist` may not be enough in terms of consuming incoming Commands at a high rate, because it has to wait until all
|
||||
If faced with relaxed local consistency requirements and high throughput demands sometimes @scala[`PersistentActor`]@java[`AbstractPersistentActor`] and its
|
||||
@scala[@scaladoc[persist](akka.persistence.PersistentActor#persist[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[persist](akka.persistence.AbstractPersistentActorLike#persist(A,akka.japi.Procedure))] may not be enough in terms of consuming incoming Commands at a high rate, because it has to wait until all
|
||||
Events related to a given Command are processed in order to start processing the next Command. While this abstraction is
|
||||
very useful for most cases, sometimes you may be faced with relaxed requirements about consistency – for example you may
|
||||
want to process commands as fast as you can, assuming that the Event will eventually be persisted and handled properly in
|
||||
the background, retroactively reacting to persistence failures if needed.
|
||||
|
||||
The `persistAsync` method provides a tool for implementing high-throughput persistent actors. It will *not*
|
||||
The @scala[@scaladoc[persistAsync](akka.persistence.PersistentActor#persistAsync[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[persistAsync](akka.persistence.AbstractPersistentActorLike#persistAsync(A,akka.japi.Procedure))] method provides a tool for implementing high-throughput persistent actors. It will *not*
|
||||
stash incoming Commands while the Journal is still working on persisting and/or user code is executing event callbacks.
|
||||
|
||||
In the below example, the event callbacks may be called "at any time", even after the next Command has been processed.
|
||||
|
|
@ -303,9 +303,9 @@ The callback will not be invoked if the actor is restarted (or stopped) in betwe
|
|||
<a id="defer"></a>
|
||||
### Deferring actions until preceding persist handlers have executed
|
||||
|
||||
Sometimes when working with `persistAsync` or `persist` you may find that it would be nice to define some actions in terms of
|
||||
''happens-after the previous `persistAsync`/`persist` handlers have been invoked''. `PersistentActor` provides utility methods
|
||||
called `defer` and `deferAsync`, which work similarly to `persist` and `persistAsync` respectively yet do not persist the
|
||||
Sometimes when working with @scala[@scaladoc[persistAsync](akka.persistence.PersistentActor#persistAsync[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[persistAsync](akka.persistence.AbstractPersistentActorLike#persistAsync(A,akka.japi.Procedure))] or @scala[@scaladoc[persist](akka.persistence.PersistentActor#persist[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[persist](akka.persistence.AbstractPersistentActorLike#persist(A,akka.japi.Procedure))] you may find that it would be nice to define some actions in terms of
|
||||
''happens-after the previous `persistAsync`/`persist` handlers have been invoked''. @scala[`PersistentActor`]@java[`AbstractPersistentActor`] provides utility methods
|
||||
called @scala[@scaladoc[defer](akka.persistence.PersistentActor#defer[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[defer](akka.persistence.AbstractPersistentActorLike#defer(A,akka.japi.Procedure))] and @scala[@scaladoc[deferAsync](akka.persistence.PersistentActor#deferAsync[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[deferAsync](akka.persistence.AbstractPersistentActorLike#deferAsync(A,akka.japi.Procedure))], which work similarly to `persist` and `persistAsync` respectively yet do not persist the
|
||||
passed in event. It is recommended to use them for *read* operations, and actions which do not have corresponding events in your
|
||||
domain model.
|
||||
|
||||
|
|
@ -346,7 +346,7 @@ The callback will not be invoked if the actor is restarted (or stopped) in betwe
|
|||
|
||||
### Nested persist calls
|
||||
|
||||
It is possible to call `persist` and `persistAsync` inside their respective callback blocks and they will properly
|
||||
It is possible to call @scala[@scaladoc[persist](akka.persistence.PersistentActor#persist[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[persist](akka.persistence.AbstractPersistentActorLike#persist(A,akka.japi.Procedure))] and @scala[@scaladoc[persistAsync](akka.persistence.PersistentActor#persistAsync[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[persistAsync](akka.persistence.AbstractPersistentActorLike#persistAsync(A,akka.japi.Procedure))] inside their respective callback blocks and they will properly
|
||||
retain both the thread safety (including the right value of @scala[`sender()`]@java[`getSender()`]) as well as stashing guarantees.
|
||||
|
||||
In general it is encouraged to create command handlers which do not need to resort to nested event persisting,
|
||||
|
|
@ -360,7 +360,7 @@ Scala
|
|||
Java
|
||||
: @@snip [LambdaPersistenceDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java) { #nested-persist-persist }
|
||||
|
||||
When sending two commands to this `PersistentActor`, the persist handlers will be executed in the following order:
|
||||
When sending two commands to this @scala[`PersistentActor`]@java[`AbstractPersistentActor`], the persist handlers will be executed in the following order:
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistenceDocSpec.scala) { #nested-persist-persist-caller }
|
||||
|
|
@ -411,7 +411,7 @@ and the actor will unconditionally be stopped.
|
|||
The reason that it cannot resume when persist fails is that it is unknown if the event was actually
|
||||
persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures
|
||||
will most likely fail anyway since the journal is probably unavailable. It is better to stop the
|
||||
actor and after a back-off timeout start it again. The `akka.pattern.BackoffSupervisor` actor
|
||||
actor and after a back-off timeout start it again. The @apidoc[akka.pattern.BackoffSupervisor$] actor
|
||||
is provided to support such restarts.
|
||||
|
||||
Scala
|
||||
|
|
@ -434,18 +434,18 @@ if you for example know that serialization format has changed in an incompatible
|
|||
### Atomic writes
|
||||
|
||||
Each event is stored atomically, but it is also possible to store several events atomically by
|
||||
using the `persistAll` or `persistAllAsync` method. That means that all events passed to that method
|
||||
using the @scala[@scaladoc[persistAll](akka.persistence.PersistentActor#persistAll[A](events:Seq[A])(handler:A=%3EUnit):Unit)]@java[@javadoc[persistAll](akka.persistence.AbstractPersistentActorLike#persistAll(java.lang.Iterable,akka.japi.Procedure))] or @scala[@scaladoc[persistAllAsync](akka.persistence.PersistentActor#persistAllAsync[A](events:Seq[A])(handler:A=%3EUnit):Unit)]@java[@javadoc[persistAllAsync](akka.persistence.AbstractPersistentActorLike#persistAllAsync(java.lang.Iterable,akka.japi.Procedure))] method. That means that all events passed to that method
|
||||
are stored or none of them are stored if there is an error.
|
||||
|
||||
The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by
|
||||
*persistAll*.
|
||||
|
||||
Some journals may not support atomic writes of several events and they will then reject the `persistAll`
|
||||
command, i.e. `onPersistRejected` is called with an exception (typically `UnsupportedOperationException`).
|
||||
command, i.e. `onPersistRejected` is called with an exception (typically @javadoc[UnsupportedOperationException](java.lang.UnsupportedOperationException)).
|
||||
|
||||
### Batch writes
|
||||
|
||||
In order to optimize throughput when using `persistAsync`, a persistent actor
|
||||
In order to optimize throughput when using @scala[@scaladoc[persistAsync](akka.persistence.PersistentActor#persistAsync[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[persistAsync](akka.persistence.AbstractPersistentActorLike#persistAsync(A,akka.japi.Procedure))], a persistent actor
|
||||
internally batches events to be stored under high load before writing them to
|
||||
the journal (as a single batch). The batch size is dynamically determined by
|
||||
how many events are emitted during the time of a journal round-trip: after
|
||||
|
|
@ -456,7 +456,7 @@ timer-based which keeps latencies at a minimum.
|
|||
### Message deletion
|
||||
|
||||
It is possible to delete all messages (journaled by a single persistent actor) up to a specified sequence number;
|
||||
Persistent actors may call the `deleteMessages` method to this end.
|
||||
Persistent actors may call the @scala[@scaladoc[deleteMessages](akka.persistence.PersistentActor#deleteMessages(toSequenceNr:Long):Unit)]@java[@javadoc[deleteMessages](akka.persistence.AbstractPersistentActorLike#deleteMessages(long))] method to this end.
|
||||
|
||||
Deleting messages in Event Sourcing based applications is typically either not used at all, or used in conjunction with
|
||||
[snapshotting](#snapshots), i.e. after a snapshot has been successfully stored, a `deleteMessages(toSequenceNr)`
|
||||
|
|
@ -472,8 +472,8 @@ you have to design your application so that it is not affected by missing messag
|
|||
|
||||
@@@
|
||||
|
||||
The result of the `deleteMessages` request is signaled to the persistent actor with a `DeleteMessagesSuccess`
|
||||
message if the delete was successful or a `DeleteMessagesFailure` message if it failed.
|
||||
The result of the `deleteMessages` request is signaled to the persistent actor with a @apidoc[akka.persistence.DeleteMessagesSuccess]
|
||||
message if the delete was successful or a @apidoc[akka.persistence.DeleteMessagesFailure] message if it failed.
|
||||
|
||||
Message deletion doesn't affect the highest sequence number of the journal, even if all messages were deleted from it after `deleteMessages` invocation.
|
||||
|
||||
|
|
@ -497,7 +497,7 @@ For critical failures, such as recovery or persisting events failing, the persis
|
|||
handler is invoked. This is because if the underlying journal implementation is signalling persistence failures it is most
|
||||
likely either failing completely or overloaded and restarting right-away and trying to persist the event again will most
|
||||
likely not help the journal recover – as it would likely cause a [Thundering herd problem](https://en.wikipedia.org/wiki/Thundering_herd_problem), as many persistent actors
|
||||
would restart and try to persist their events again. Instead, using a `BackoffSupervisor` (as described in @ref:[Failures](#failures)) which
|
||||
would restart and try to persist their events again. Instead, using a @apidoc[BackoffSupervisor](akka.pattern.BackoffSupervisor$) (as described in @ref:[Failures](#failures)) which
|
||||
implements an exponential-backoff strategy which allows for more breathing room for the journal to recover between
|
||||
restarts of the persistent actor.
|
||||
|
||||
|
|
@ -520,7 +520,7 @@ to signal to an Actor that it should stop itself once it receives this message
|
|||
automatically by Akka, leaving the target actor no way to refuse stopping itself when given a poison pill.
|
||||
|
||||
This can be dangerous when used with `PersistentActor` due to the fact that incoming commands are *stashed* while
|
||||
the persistent actor is awaiting confirmation from the Journal that events have been written when `persist()` was used.
|
||||
the persistent actor is awaiting confirmation from the Journal that events have been written when @scala[@scaladoc[persist()](akka.persistence.PersistentActor#persist[A](event:A)(handler:A=%3EUnit):Unit)]@java[@javadoc[persist()](akka.persistence.AbstractPersistentActorLike#persist(A,akka.japi.Procedure))] was used.
|
||||
Since the incoming commands will be drained from the Actor's mailbox and put into its internal stash while awaiting the
|
||||
confirmation (thus, before calling the persist handlers) the Actor **may receive and (auto)handle the PoisonPill
|
||||
before it processes the other messages which have been put into its stash**, causing a pre-mature shutdown of the Actor.
|
||||
|
|
@ -562,8 +562,8 @@ See @ref:[Replay filter](typed/persistence.md#replay-filter) in the documentatio
|
|||
|
||||
As you model your domain using actors, you may notice that some actors may be prone to accumulating extremely long event logs and experiencing long recovery times. Sometimes, the right approach may be to split out into a set of shorter lived actors. However, when this is not an option, you can use snapshots to reduce recovery times drastically.
|
||||
|
||||
Persistent actors can save snapshots of internal state by calling the `saveSnapshot` method. If saving of a snapshot
|
||||
succeeds, the persistent actor receives a `SaveSnapshotSuccess` message, otherwise a `SaveSnapshotFailure` message
|
||||
Persistent actors can save snapshots of internal state by calling the @apidoc[saveSnapshot](akka.persistence.Snapshotter) {scala="#saveSnapshot(snapshot:Any):Unit" java="#saveSnapshot(java.lang.Object)"} method. If saving of a snapshot
|
||||
succeeds, the persistent actor receives a @apidoc[akka.persistence.SaveSnapshotSuccess] message, otherwise a @apidoc[akka.persistence.SaveSnapshotFailure] message
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistenceDocSpec.scala) { #save-snapshot }
|
||||
|
|
@ -571,13 +571,13 @@ Scala
|
|||
Java
|
||||
: @@snip [LambdaPersistenceDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java) { #save-snapshot }
|
||||
|
||||
where `metadata` is of type `SnapshotMetadata` and contains:
|
||||
where `metadata` is of type @apidoc[akka.persistence.SnapshotMetadata] and contains:
|
||||
|
||||
* persistenceId
|
||||
* sequenceNr
|
||||
* timestamp
|
||||
|
||||
During recovery, the persistent actor is offered the latest saved snapshot via a `SnapshotOffer` message from
|
||||
During recovery, the persistent actor is offered the latest saved snapshot via a @apidoc[akka.persistence.SnapshotOffer] message from
|
||||
which it can initialize internal state.
|
||||
|
||||
Scala
|
||||
|
|
@ -590,7 +590,7 @@ The replayed messages that follow the `SnapshotOffer` message, if any, are young
|
|||
They finally recover the persistent actor to its current (i.e. latest) state.
|
||||
|
||||
In general, a persistent actor is only offered a snapshot if that persistent actor has previously saved one or more snapshots
|
||||
and at least one of these snapshots matches the `SnapshotSelectionCriteria` that can be specified for recovery.
|
||||
and at least one of these snapshots matches the @apidoc[akka.persistence.SnapshotSelectionCriteria] that can be specified for recovery.
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistenceDocSpec.scala) { #snapshot-criteria }
|
||||
|
|
@ -598,8 +598,8 @@ Scala
|
|||
Java
|
||||
: @@snip [LambdaPersistenceDocTest.java](/akka-docs/src/test/java/jdocs/persistence/LambdaPersistenceDocTest.java) { #snapshot-criteria }
|
||||
|
||||
If not specified, they default to @scala[`SnapshotSelectionCriteria.Latest`]@java[`SnapshotSelectionCriteria.latest()`] which selects the latest (= youngest) snapshot.
|
||||
To disable snapshot-based recovery, applications should use @scala[`SnapshotSelectionCriteria.None`]@java[`SnapshotSelectionCriteria.none()`]. A recovery where no
|
||||
If not specified, they default to @scala[@scaladoc[SnapshotSelectionCriteria.Latest](akka.persistence.SnapshotSelectionCriteria$#Latest:akka.persistence.SnapshotSelectionCriteria)]@java[@javadoc[SnapshotSelectionCriteria.latest()](akka.persistence.SnapshotSelectionCriteria#latest())] which selects the latest (= youngest) snapshot.
|
||||
To disable snapshot-based recovery, applications should use @scala[@scaladoc[SnapshotSelectionCriteria.None](akka.persistence.SnapshotSelectionCriteria$#None:akka.persistence.SnapshotSelectionCriteria)]@java[@javadoc[SnapshotSelectionCriteria.none()](akka.persistence.SnapshotSelectionCriteria#none())]. A recovery where no
|
||||
saved snapshot matches the specified `SnapshotSelectionCriteria` will replay all journaled messages.
|
||||
|
||||
@@@ note
|
||||
|
|
@ -609,7 +609,7 @@ or the @scala[`PersistentActor`]@java[persistent actor] can pick a snapshot stor
|
|||
|
||||
Because some use cases may not benefit from or need snapshots, it is perfectly valid not to not configure a snapshot store.
|
||||
However, Akka will log a warning message when this situation is detected and then continue to operate until
|
||||
an actor tries to store a snapshot, at which point the operation will fail (by replying with an `SaveSnapshotFailure` for example).
|
||||
an actor tries to store a snapshot, at which point the operation will fail (by replying with an @apidoc[akka.persistence.SaveSnapshotFailure] for example).
|
||||
|
||||
Note that the "persistence mode" of @ref:[Cluster Sharding](cluster-sharding.md) makes use of snapshots. If you use that mode, you'll need to define a snapshot store plugin.
|
||||
|
||||
|
|
@ -617,11 +617,11 @@ Note that the "persistence mode" of @ref:[Cluster Sharding](cluster-sharding.md)
|
|||
|
||||
### Snapshot deletion
|
||||
|
||||
A persistent actor can delete individual snapshots by calling the `deleteSnapshot` method with the sequence number of
|
||||
A persistent actor can delete individual snapshots by calling the @apidoc[deleteSnapshot](akka.persistence.Snapshotter) {scala="#deleteSnapshot(sequenceNr:Long):Unit" java="#deleteSnapshot(long)"} method with the sequence number of
|
||||
when the snapshot was taken.
|
||||
|
||||
To bulk-delete a range of snapshots matching `SnapshotSelectionCriteria`,
|
||||
persistent actors should use the `deleteSnapshots` method. Depending on the journal used this might be inefficient. It is
|
||||
To bulk-delete a range of snapshots matching @apidoc[akka.persistence.SnapshotSelectionCriteria],
|
||||
persistent actors should use the @apidoc[deleteSnapshots](akka.persistence.Snapshotter) {scala="#deleteSnapshots(criteria:akka.persistence.SnapshotSelectionCriteria):Unit" java="#deleteSnapshots(akka.persistence.SnapshotSelectionCriteria)"} method. Depending on the journal used this might be inefficient. It is
|
||||
best practice to do specific deletes with `deleteSnapshot` or to include a `minSequenceNr` as well as a `maxSequenceNr`
|
||||
for the `SnapshotSelectionCriteria`.
|
||||
|
||||
|
|
@ -660,7 +660,7 @@ See @ref:[Scaling out](typed/persistence.md#scaling-out) in the documentation of
|
|||
|
||||
## At-Least-Once Delivery
|
||||
|
||||
To send messages with at-least-once delivery semantics to destinations you can @scala[mix-in `AtLeastOnceDelivery` trait to your `PersistentActor`]@java[extend the `AbstractPersistentActorWithAtLeastOnceDelivery` class instead of `AbstractPersistentActor`]
|
||||
To send messages with at-least-once delivery semantics to destinations you can @scala[mix-in @scaladoc[AtLeastOnceDelivery](akka.persistence.AtLeastOnceDelivery) trait to your @scaladoc[PersistentActor](akka.persistence.PersistentActor)]@java[extend the @javadoc[AbstractPersistentActorWithAtLeastOnceDelivery](akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery) class instead of @javadoc[AbstractPersistentActor](akka.persistence.AbstractPersistentActor)]
|
||||
on the sending side. It takes care of re-sending messages when they
|
||||
have not been confirmed within a configurable timeout.
|
||||
|
||||
|
|
@ -674,7 +674,7 @@ received.
|
|||
|
||||
At-least-once delivery implies that original message sending order is not always preserved,
|
||||
and the destination may receive duplicate messages.
|
||||
Semantics do not match those of a normal `ActorRef` send operation:
|
||||
Semantics do not match those of a normal @apidoc[akka.actor.ActorRef] send operation:
|
||||
|
||||
* it is not at-most-once delivery
|
||||
* message order for the same sender–receiver pair is not preserved due to
|
||||
|
|
@ -682,14 +682,14 @@ possible resends
|
|||
* after a crash and restart of the destination messages are still
|
||||
delivered to the new actor incarnation
|
||||
|
||||
These semantics are similar to what an `ActorPath` represents (see
|
||||
These semantics are similar to what an @apidoc[akka.actor.ActorPath] represents (see
|
||||
@ref:[Actor Lifecycle](actors.md#actor-lifecycle)), therefore you need to supply a path and not a
|
||||
reference when delivering messages. The messages are sent to the path with
|
||||
an actor selection.
|
||||
|
||||
@@@
|
||||
|
||||
Use the `deliver` method to send a message to a destination. Call the `confirmDelivery` method
|
||||
Use the @scala[@scaladoc[deliver](akka.persistence.AtLeastOnceDelivery#deliver(destination:akka.actor.ActorPath)(deliveryIdToMessage:Long=%3EAny):Unit)]@java[@javadoc[deliver](akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery#deliver(akka.actor.ActorPath,akka.japi.Function))] method to send a message to a destination. Call the @scala[@scaladoc[confirmDelivery](akka.persistence.AtLeastOnceDelivery#confirmDelivery(deliveryId:Long):Boolean)]@java[@javadoc[confirmDelivery](akka.persistence.AtLeastOnceDeliveryLike#confirmDelivery(long))] method
|
||||
when the destination has replied with a confirmation message.
|
||||
|
||||
### Relationship between deliver and confirmDelivery
|
||||
|
|
@ -726,43 +726,43 @@ correlation id (passed into the message). You can do this by storing such mappin
|
|||
from which you can retrieve the `deliveryId` to be passed into the `confirmDelivery` method once the receiver
|
||||
of your message has replied with your custom correlation id.
|
||||
|
||||
The @scala[`AtLeastOnceDelivery` trait]@java[`AbstractPersistentActorWithAtLeastOnceDelivery` class] has a state consisting of unconfirmed messages and a
|
||||
The @scala[@scaladoc[AtLeastOnceDelivery](akka.persistence.AtLeastOnceDelivery) trait]@java[@javadoc[AbstractPersistentActorWithAtLeastOnceDelivery](akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery) class] has a state consisting of unconfirmed messages and a
|
||||
sequence number. It does not store this state itself. You must persist events corresponding to the
|
||||
`deliver` and `confirmDelivery` invocations from your `PersistentActor` so that the state can
|
||||
@scala[@scaladoc[deliver](akka.persistence.AtLeastOnceDelivery#deliver(destination:akka.actor.ActorPath)(deliveryIdToMessage:Long=%3EAny):Unit)]@java[@javadoc[deliver](akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery#deliver(akka.actor.ActorPath,akka.japi.Function))] and @scala[@scaladoc[confirmDelivery](akka.persistence.AtLeastOnceDelivery#confirmDelivery(deliveryId:Long):Boolean)]@java[@javadoc[confirmDelivery](akka.persistence.AtLeastOnceDeliveryLike#confirmDelivery(long))] invocations from your `PersistentActor` so that the state can
|
||||
be restored by calling the same methods during the recovery phase of the `PersistentActor`. Sometimes
|
||||
these events can be derived from other business level events, and sometimes you must create separate events.
|
||||
During recovery, calls to `deliver` will not send out messages, those will be sent later
|
||||
if no matching `confirmDelivery` will have been performed.
|
||||
|
||||
Support for snapshots is provided by `getDeliverySnapshot` and `setDeliverySnapshot`.
|
||||
The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages.
|
||||
Support for snapshots is provided by @apidoc[getDeliverySnapshot](akka.persistence.AtLeastOnceDeliveryLike) {scala="#getDeliverySnapshot:akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot" java="#getDeliverySnapshot()"} and @apidoc[setDeliverySnapshot](akka.persistence.AtLeastOnceDeliveryLike) {scala="#setDeliverySnapshot(snapshot:akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot):Unit" java="#setDeliverySnapshot(akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot)"}.
|
||||
The @apidoc[akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot] contains the full delivery state, including unconfirmed messages.
|
||||
If you need a custom snapshot for other parts of the actor state you must also include the
|
||||
`AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka
|
||||
serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot`
|
||||
as a blob in your custom snapshot.
|
||||
|
||||
The interval between redelivery attempts is defined by the `redeliverInterval` method.
|
||||
The interval between redelivery attempts is defined by the @apidoc[redeliverInterval](akka.persistence.AtLeastOnceDeliveryLike) {scala="#redeliverInterval:scala.concurrent.duration.FiniteDuration" java="#redeliverInterval()"} method.
|
||||
The default value can be configured with the `akka.persistence.at-least-once-delivery.redeliver-interval`
|
||||
configuration key. The method can be overridden by implementation classes to return non-default values.
|
||||
|
||||
The maximum number of messages that will be sent at each redelivery burst is defined by the
|
||||
`redeliveryBurstLimit` method (burst frequency is half of the redelivery interval). If there's a lot of
|
||||
@apidoc[redeliverBurstLimit](akka.persistence.AtLeastOnceDeliveryLike) {scala="#redeliveryBurstLimit:Int" java="#redeliveryBurstLimit()"} method (burst frequency is half of the redelivery interval). If there's a lot of
|
||||
unconfirmed messages (e.g. if the destination is not available for a long time), this helps to prevent an overwhelming
|
||||
amount of messages to be sent at once. The default value can be configured with the
|
||||
`akka.persistence.at-least-once-delivery.redelivery-burst-limit` configuration key. The method can be overridden
|
||||
by implementation classes to return non-default values.
|
||||
|
||||
After a number of delivery attempts a `AtLeastOnceDelivery.UnconfirmedWarning` message
|
||||
After a number of delivery attempts a @apidoc[akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning] message
|
||||
will be sent to `self`. The re-sending will still continue, but you can choose to call
|
||||
`confirmDelivery` to cancel the re-sending. The number of delivery attempts before emitting the
|
||||
warning is defined by the `warnAfterNumberOfUnconfirmedAttempts` method. The default value can be
|
||||
warning is defined by the @apidoc[warnAfterNumberOfUnconfirmedAttempts](akka.persistence.AtLeastOnceDeliveryLike) {scala="#warnAfterNumberOfUnconfirmedAttempts:Int" java="#warnAfterNumberOfUnconfirmedAttempts()"} method. The default value can be
|
||||
configured with the `akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts`
|
||||
configuration key. The method can be overridden by implementation classes to return non-default values.
|
||||
|
||||
The @scala[`AtLeastOnceDelivery` trait]@java[`AbstractPersistentActorWithAtLeastOnceDelivery` class] holds messages in memory until their successful delivery has been confirmed.
|
||||
The @scala[@scaladoc[AtLeastOnceDelivery](akka.persistence.AtLeastOnceDelivery) trait]@java[@javadoc[AbstractPersistentActorWithAtLeastOnceDelivery](akka.persistence.AbstractPersistentActorWithAtLeastOnceDelivery) class] holds messages in memory until their successful delivery has been confirmed.
|
||||
The maximum number of unconfirmed messages that the actor is allowed to hold in memory
|
||||
is defined by the `maxUnconfirmedMessages` method. If this limit is exceed the `deliver` method will
|
||||
not accept more messages and it will throw `AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException`.
|
||||
is defined by the @apidoc[maxUnconfirmedMessages](akka.persistence.AtLeastOnceDeliveryLike) {scala="#maxUnconfirmedMessages:Int" java="#maxUnconfirmedMessages()"} method. If this limit is exceed the `deliver` method will
|
||||
not accept more messages and it will throw @apidoc[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException].
|
||||
The default value can be configured with the `akka.persistence.at-least-once-delivery.max-unconfirmed-messages`
|
||||
configuration key. The method can be overridden by implementation classes to return non-default values.
|
||||
|
||||
|
|
@ -775,7 +775,7 @@ Event Adapters help in situations where:
|
|||
|
||||
* **Version Migrations** – existing events stored in *Version 1* should be "upcasted" to a new *Version 2* representation,
|
||||
and the process of doing so involves actual code, not just changes on the serialization layer. For these scenarios
|
||||
the `toJournal` function is usually an identity function, however the `fromJournal` is implemented as
|
||||
the @apidoc[toJournal](akka.persistence.journal.WriteEventAdapter) {scala="#toJournal(event:Any):Any" java="#toJournal(java.lang.Object)"} function is usually an identity function, however the @apidoc[fromJournal](akka.persistence.journal.ReadEventAdapter) {scala="#fromJournal(event:Any,manifest:String):akka.persistence.journal.EventSeq" java="#fromJournal(java.lang.Object,java.lang.String)"} is implemented as
|
||||
`v1.Event=>v2.Event`, performing the necessary mapping inside the fromJournal method.
|
||||
This technique is sometimes referred to as "upcasting" in other CQRS libraries.
|
||||
* **Separating Domain and Data models** – thanks to EventAdapters it is possible to completely separate the domain model
|
||||
|
|
@ -802,7 +802,7 @@ It is possible to bind multiple adapters to one class *for recovery*, in which c
|
|||
bound adapters will be applied to a given matching event (in order of definition in the configuration). Since each adapter may
|
||||
return from `0` to `n` adapted events (called as `EventSeq`), each adapter can investigate the event and if it should
|
||||
indeed adapt it return the adapted event(s) for it. Other adapters which do not have anything to contribute during this
|
||||
adaptation simply return `EventSeq.empty`. The adapted events are then delivered in-order to the `PersistentActor` during replay.
|
||||
adaptation simply return @apidoc[EventSeq.empty](akka.persistence.journal.EventSeq$) {scala="#empty:akka.persistence.journal.EventSeq" java="#empty()"}. The adapted events are then delivered in-order to the `PersistentActor` during replay.
|
||||
|
||||
@@@ note
|
||||
|
||||
|
|
@ -832,7 +832,7 @@ The LevelDB journal is deprecated and will be removed from a future Akka version
|
|||
with it. For testing the built in "inmem" journal or the actual journal that will be used in production of the application
|
||||
is recommended. See @ref[Persistence Plugins](persistence-plugins.md) for some journal implementation choices.
|
||||
|
||||
When running tests with LevelDB default settings in `sbt`, make sure to set `fork := true` in your sbt project. Otherwise, you'll see an `UnsatisfiedLinkError`. Alternatively, you can switch to a LevelDB Java port by setting
|
||||
When running tests with LevelDB default settings in `sbt`, make sure to set `fork := true` in your sbt project. Otherwise, you'll see an @javadoc[UnsatisfiedLinkError](java.lang.UnsatisfiedLinkError). Alternatively, you can switch to a LevelDB Java port by setting
|
||||
|
||||
@@snip [PersistencePluginDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistencePluginDocSpec.scala) { #native-config }
|
||||
|
||||
|
|
@ -851,7 +851,7 @@ in your Akka configuration. Also note that for the LevelDB Java port, you will n
|
|||
@@@ warning
|
||||
|
||||
It is not possible to test persistence provided classes (i.e. `PersistentActor`
|
||||
and @ref:[AtLeastOnceDelivery](#at-least-once-delivery)) using `TestActorRef` due to its *synchronous* nature.
|
||||
and @ref:[AtLeastOnceDelivery](#at-least-once-delivery)) using @apidoc[TestActorRef] due to its *synchronous* nature.
|
||||
These traits need to be able to perform asynchronous tasks in the background in order to handle internal persistence
|
||||
related events.
|
||||
|
||||
|
|
@ -874,7 +874,7 @@ configured in the following sections of the `reference.conf` configuration resou
|
|||
|
||||
@@snip [PersistenceMultiDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistenceMultiDocSpec.scala) { #default-config }
|
||||
|
||||
Note that in this case the actor overrides only the `persistenceId` method:
|
||||
Note that in this case the actor overrides only the @apidoc[persistenceId](akka.persistence.PersistenceIdentity) {scala="#persistenceId:String" java="#persistenceId()"} method:
|
||||
|
||||
Scala
|
||||
: @@snip [PersistenceMultiDocSpec.scala](/akka-docs/src/test/scala/docs/persistence/PersistenceMultiDocSpec.scala) { #default-plugins }
|
||||
|
|
@ -882,7 +882,7 @@ Scala
|
|||
Java
|
||||
: @@snip [PersistenceMultiDocTest.java](/akka-docs/src/test/java/jdocs/persistence/PersistenceMultiDocTest.java) { #default-plugins }
|
||||
|
||||
When the persistent actor overrides the `journalPluginId` and `snapshotPluginId` methods,
|
||||
When the persistent actor overrides the @apidoc[journalPluginId](akka.persistence.PersistenceIdentity) {scala="#journalPluginId:String" java="#journalPluginId()"} and @apidoc[snapshotPluginId](akka.persistence.PersistenceIdentity) {scala="#snapshotPluginId:String" java="#snapshotPluginId()"} methods,
|
||||
the actor will be serviced by these specific persistence plugins instead of the defaults:
|
||||
|
||||
Scala
|
||||
|
|
@ -898,10 +898,10 @@ plugin entries with a standard `class` property as well as settings which are sp
|
|||
|
||||
## Give persistence plugin configurations at runtime
|
||||
|
||||
By default, a persistent actor will use the configuration loaded at `ActorSystem` creation time to create journal and snapshot store plugins.
|
||||
By default, a persistent actor will use the configuration loaded at @apidoc[akka.actor.ActorSystem] creation time to create journal and snapshot store plugins.
|
||||
|
||||
When the persistent actor overrides the `journalPluginConfig` and `snapshotPluginConfig` methods,
|
||||
the actor will use the declared `Config` objects with a fallback on the default configuration.
|
||||
When the persistent actor overrides the @apidoc[journalPluginConfig](akka.persistence.RuntimePluginConfig) {scala="#journalPluginConfig:com.typesafe.config.Config" java="#journalPluginConfig()"} and @apidoc[snapshotPluginConfig](akka.persistence.RuntimePluginConfig) {scala="#snapshotPluginConfig:com.typesafe.config.Config" java="#snapshotPluginConfig()"} methods,
|
||||
the actor will use the declared @javadoc[Config](com.typesafe.config.Config) objects with a fallback on the default configuration.
|
||||
It allows a dynamic configuration of the journal and the snapshot store at runtime:
|
||||
|
||||
Scala
|
||||
|
|
|
|||
|
|
@ -41,6 +41,8 @@ object Paradox {
|
|||
"javadoc.com.fasterxml.jackson.databind.link_style" -> "direct",
|
||||
"javadoc.com.google.protobuf.base_url" -> "https://javadoc.io/doc/com.google.protobuf/protobuf-java/latest/",
|
||||
"javadoc.com.google.protobuf.link_style" -> "direct",
|
||||
"javadoc.com.typesafe.config.base_url" -> "https://javadoc.io/doc/com.typesafe/config/latest/",
|
||||
"javadoc.com.typesafe.config.link_style" -> "direct",
|
||||
"scala.version" -> scalaVersion.value,
|
||||
"scala.binary.version" -> scalaBinaryVersion.value,
|
||||
"akka.version" -> version.value,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue