+per #18559 TCK tests for highestSequenceNr not reset after journal message deletion
* added tests * docs about highestSeqenceNr behaviour after message deletion * clarification about highestSequenceNr in AsyncRecovery#asyncReadHighestSequenceNr and AsyncWriteJournal#asyncDeleteMessagesTo
This commit is contained in:
parent
b5412a1ee0
commit
4eaa77608c
5 changed files with 114 additions and 83 deletions
|
|
@ -34,7 +34,7 @@ Akka persistence is a separate jar file. Make sure that you have the following d
|
||||||
<version>@version@</version>
|
<version>@version@</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
Akka persistence extension comes with few built-in persistence plugins, including
|
Akka persistence extension comes with few built-in persistence plugins, including
|
||||||
in-memory heap based journal, local file-system based snapshot-store and LevelDB based journal.
|
in-memory heap based journal, local file-system based snapshot-store and LevelDB based journal.
|
||||||
|
|
||||||
LevelDB based plugins will require the following additional dependency declaration::
|
LevelDB based plugins will require the following additional dependency declaration::
|
||||||
|
|
@ -66,12 +66,12 @@ Architecture
|
||||||
case of sender and receiver JVM crashes.
|
case of sender and receiver JVM crashes.
|
||||||
|
|
||||||
* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages
|
* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages
|
||||||
are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable.
|
are journaled and which are received by the persistent actor without being journaled. Journal maintains *highestSequenceNr* that is increased on each message.
|
||||||
Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem,
|
The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem,
|
||||||
and replicated journals are available as `Community plugins`_.
|
and replicated journals are available as `Community plugins`_.
|
||||||
|
|
||||||
* *Snapshot store*: A snapshot store persists snapshots of a persistent actor's or a view's internal state. Snapshots are
|
* *Snapshot store*: A snapshot store persists snapshots of a persistent actor's or a view's internal state. Snapshots are
|
||||||
used for optimizing recovery times. The storage backend of a snapshot store is pluggable.
|
used for optimizing recovery times. The storage backend of a snapshot store is pluggable.
|
||||||
Persistence extension comes with a "local" snapshot storage plugin, which writes to the local filesystem,
|
Persistence extension comes with a "local" snapshot storage plugin, which writes to the local filesystem,
|
||||||
and replicated snapshot stores are available as `Community plugins`_.
|
and replicated snapshot stores are available as `Community plugins`_.
|
||||||
|
|
||||||
|
|
@ -135,7 +135,7 @@ don't consume too much memory.
|
||||||
|
|
||||||
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default)
|
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default)
|
||||||
and the actor will unconditionally be stopped. If persistence of an event is rejected before it is
|
and the actor will unconditionally be stopped. If persistence of an event is rejected before it is
|
||||||
stored, e.g. due to serialization error, ``onPersistRejected`` will be invoked (logging a warning
|
stored, e.g. due to serialization error, ``onPersistRejected`` will be invoked (logging a warning
|
||||||
by default) and the actor continues with next message.
|
by default) and the actor continues with next message.
|
||||||
|
|
||||||
The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
||||||
|
|
@ -166,7 +166,7 @@ Recovery
|
||||||
--------
|
--------
|
||||||
|
|
||||||
By default, a persistent actor is automatically recovered on start and on restart by replaying journaled messages.
|
By default, a persistent actor is automatically recovered on start and on restart by replaying journaled messages.
|
||||||
New messages sent to a persistent actor during recovery do not interfere with replayed messages.
|
New messages sent to a persistent actor during recovery do not interfere with replayed messages.
|
||||||
They are cached and received by a persistent actor after recovery phase completes.
|
They are cached and received by a persistent actor after recovery phase completes.
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
|
|
@ -202,7 +202,7 @@ and before any other received messages.
|
||||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed
|
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-completed
|
||||||
|
|
||||||
If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
|
If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
|
||||||
is called (logging the error by default) and the actor will be stopped.
|
is called (logging the error by default) and the actor will be stopped.
|
||||||
|
|
||||||
.. _persist-async-java:
|
.. _persist-async-java:
|
||||||
|
|
||||||
|
|
@ -227,10 +227,10 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e
|
||||||
.. note::
|
.. note::
|
||||||
In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming messages right away,
|
In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming messages right away,
|
||||||
and handle them in the callback.
|
and handle them in the callback.
|
||||||
|
|
||||||
.. warning::
|
.. warning::
|
||||||
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||||
``persistAsync`` and the journal has confirmed the write.
|
``persistAsync`` and the journal has confirmed the write.
|
||||||
|
|
||||||
.. _defer-java:
|
.. _defer-java:
|
||||||
|
|
||||||
|
|
@ -297,17 +297,17 @@ Failures
|
||||||
--------
|
--------
|
||||||
|
|
||||||
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default)
|
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default)
|
||||||
and the actor will unconditionally be stopped.
|
and the actor will unconditionally be stopped.
|
||||||
|
|
||||||
The reason that it cannot resume when persist fails is that it is unknown if the even was actually
|
The reason that it cannot resume when persist fails is that it is unknown if the even was actually
|
||||||
persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures
|
persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures
|
||||||
will most likely fail anyway, since the journal is probably unavailable. It is better to stop the
|
will most likely fail anyway, since the journal is probably unavailable. It is better to stop the
|
||||||
actor and after a back-off timeout start it again. The ``akka.pattern.BackoffSupervisor`` actor
|
actor and after a back-off timeout start it again. The ``akka.pattern.BackoffSupervisor`` actor
|
||||||
is provided to support such restarts.
|
is provided to support such restarts.
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#backoff
|
.. includecode:: code/docs/persistence/PersistenceDocTest.java#backoff
|
||||||
|
|
||||||
If persistence of an event is rejected before it is stored, e.g. due to serialization error,
|
If persistence of an event is rejected before it is stored, e.g. due to serialization error,
|
||||||
``onPersistRejected`` will be invoked (logging a warning by default) and the actor continues with
|
``onPersistRejected`` will be invoked (logging a warning by default) and the actor continues with
|
||||||
next message.
|
next message.
|
||||||
|
|
||||||
|
|
@ -319,7 +319,7 @@ Atomic writes
|
||||||
|
|
||||||
Each event is of course stored atomically, but it is also possible to store several events atomically by
|
Each event is of course stored atomically, but it is also possible to store several events atomically by
|
||||||
using the ``persistAll`` or ``persistAllAsync`` method. That means that all events passed to that method
|
using the ``persistAll`` or ``persistAllAsync`` method. That means that all events passed to that method
|
||||||
are stored or none of them are stored if there is an error.
|
are stored or none of them are stored if there is an error.
|
||||||
|
|
||||||
The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by
|
The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by
|
||||||
`persistAll`.
|
`persistAll`.
|
||||||
|
|
@ -351,9 +351,11 @@ Deleting messages in event sourcing based applications is typically either not u
|
||||||
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
|
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
|
||||||
while still having access to the accumulated state during replays - by loading the snapshot.
|
while still having access to the accumulated state during replays - by loading the snapshot.
|
||||||
|
|
||||||
The result of the ``deleteMessages`` request is signaled to the persistent actor with a ``DeleteMessagesSuccess``
|
The result of the ``deleteMessages`` request is signaled to the persistent actor with a ``DeleteMessagesSuccess``
|
||||||
message if the delete was successful or a ``DeleteMessagesFailure`` message if it failed.
|
message if the delete was successful or a ``DeleteMessagesFailure`` message if it failed.
|
||||||
|
|
||||||
|
Message deletion doesn't affect highest sequence number of journal, even if all messages were deleted from journal after ``deleteMessages`` invocation.
|
||||||
|
|
||||||
Persistence status handling
|
Persistence status handling
|
||||||
---------------------------
|
---------------------------
|
||||||
Persisting, deleting and replaying messages can either succeed or fail.
|
Persisting, deleting and replaying messages can either succeed or fail.
|
||||||
|
|
@ -428,13 +430,13 @@ Persistent Views
|
||||||
``UntypedPersistentView`` is deprecated. Use :ref:`persistence-query-java` instead. The corresponding
|
``UntypedPersistentView`` is deprecated. Use :ref:`persistence-query-java` instead. The corresponding
|
||||||
query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source``
|
query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source``
|
||||||
to an actor corresponding to a previous ``UntypedPersistentView`` actor:
|
to an actor corresponding to a previous ``UntypedPersistentView`` actor:
|
||||||
|
|
||||||
* `Sink.actorRef`_ is simple, but has the disadvantage that there is no back-pressure signal from the
|
* `Sink.actorRef`_ is simple, but has the disadvantage that there is no back-pressure signal from the
|
||||||
destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow
|
destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow
|
||||||
* `mapAsync`_ combined with :ref:`actors-ask-lambda` is almost as simple with the advantage of back-pressure
|
* `mapAsync`_ combined with :ref:`actors-ask-lambda` is almost as simple with the advantage of back-pressure
|
||||||
being propagated all the way
|
being propagated all the way
|
||||||
* `ActorSubscriber`_ in case you need more fine grained control
|
* `ActorSubscriber`_ in case you need more fine grained control
|
||||||
|
|
||||||
The consuming actor may be a plain ``UntypedActor`` or an ``UntypedPersistentActor`` if it needs to store its
|
The consuming actor may be a plain ``UntypedActor`` or an ``UntypedPersistentActor`` if it needs to store its
|
||||||
own state (e.g. fromSequenceNr offset).
|
own state (e.g. fromSequenceNr offset).
|
||||||
|
|
||||||
|
|
@ -538,7 +540,7 @@ saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay a
|
||||||
Since it is acceptable for some applications to not use any snapshotting, it is legal to not configure a snapshot store,
|
Since it is acceptable for some applications to not use any snapshotting, it is legal to not configure a snapshot store,
|
||||||
however Akka will log a warning message when this situation is detected and then continue to operate until
|
however Akka will log a warning message when this situation is detected and then continue to operate until
|
||||||
an actor tries to store a snapshot, at which point the the operation will fail (by replying with an ``SaveSnapshotFailure`` for example).
|
an actor tries to store a snapshot, at which point the the operation will fail (by replying with an ``SaveSnapshotFailure`` for example).
|
||||||
|
|
||||||
Note that :ref:`cluster_sharding_java` is using snapshots, so if you use Cluster Sharding you need to define a snapshot store plugin.
|
Note that :ref:`cluster_sharding_java` is using snapshots, so if you use Cluster Sharding you need to define a snapshot store plugin.
|
||||||
|
|
||||||
Snapshot deletion
|
Snapshot deletion
|
||||||
|
|
@ -575,7 +577,7 @@ have not been confirmed within a configurable timeout.
|
||||||
|
|
||||||
The state of the sending actor, including which messages that have been sent and still not been
|
The state of the sending actor, including which messages that have been sent and still not been
|
||||||
confirmed by the recepient, must be persistent so that it can survive a crash of the sending actor
|
confirmed by the recepient, must be persistent so that it can survive a crash of the sending actor
|
||||||
or JVM. The ``UntypedPersistentActorWithAtLeastOnceDelivery`` class does not persist anything by itself.
|
or JVM. The ``UntypedPersistentActorWithAtLeastOnceDelivery`` class does not persist anything by itself.
|
||||||
It is your responsibility to persist the intent that a message is sent and that a confirmation has been
|
It is your responsibility to persist the intent that a message is sent and that a confirmation has been
|
||||||
received.
|
received.
|
||||||
|
|
||||||
|
|
@ -605,30 +607,30 @@ Relationship between deliver and confirmDelivery
|
||||||
------------------------------------------------
|
------------------------------------------------
|
||||||
|
|
||||||
To send messages to the destination path, use the ``deliver`` method after you have persisted the intent
|
To send messages to the destination path, use the ``deliver`` method after you have persisted the intent
|
||||||
to send the message.
|
to send the message.
|
||||||
|
|
||||||
The destination actor must send back a confirmation message. When the sending actor receives this
|
The destination actor must send back a confirmation message. When the sending actor receives this
|
||||||
confirmation message you should persist the fact that the message was delivered successfully and then call
|
confirmation message you should persist the fact that the message was delivered successfully and then call
|
||||||
the ``confirmDelivery`` method.
|
the ``confirmDelivery`` method.
|
||||||
|
|
||||||
If the persistent actor is not currently recovering, the ``deliver`` method will send the message to
|
If the persistent actor is not currently recovering, the ``deliver`` method will send the message to
|
||||||
the destination actor. When recovering, messages will be buffered until they have been confirmed using ``confirmDelivery``.
|
the destination actor. When recovering, messages will be buffered until they have been confirmed using ``confirmDelivery``.
|
||||||
Once recovery has completed, if there are outstanding messages that have not been confirmed (during the message replay),
|
Once recovery has completed, if there are outstanding messages that have not been confirmed (during the message replay),
|
||||||
the persistent actor will resend these before sending any other messages.
|
the persistent actor will resend these before sending any other messages.
|
||||||
|
|
||||||
Deliver requires a ``deliveryIdToMessage`` function to pass the provided ``deliveryId`` into the message so that correlation
|
Deliver requires a ``deliveryIdToMessage`` function to pass the provided ``deliveryId`` into the message so that correlation
|
||||||
between ``deliver`` and ``confirmDelivery`` is possible. The ``deliveryId`` must do the round trip. Upon receipt
|
between ``deliver`` and ``confirmDelivery`` is possible. The ``deliveryId`` must do the round trip. Upon receipt
|
||||||
of the message, destination actor will send the same``deliveryId`` wrapped in a confirmation message back to the sender.
|
of the message, destination actor will send the same``deliveryId`` wrapped in a confirmation message back to the sender.
|
||||||
The sender will then use it to call ``confirmDelivery`` method to complete delivery routine.
|
The sender will then use it to call ``confirmDelivery`` method to complete delivery routine.
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#at-least-once-example
|
.. includecode:: code/docs/persistence/PersistenceDocTest.java#at-least-once-example
|
||||||
|
|
||||||
The ``deliveryId`` generated by the persistence module is a strictly monotonically increasing sequence number
|
The ``deliveryId`` generated by the persistence module is a strictly monotonically increasing sequence number
|
||||||
without gaps. The same sequence is used for all destinations of the actor, i.e. when sending to multiple
|
without gaps. The same sequence is used for all destinations of the actor, i.e. when sending to multiple
|
||||||
destinations the destinations will see gaps in the sequence. It is not possible to use custom ``deliveryId``.
|
destinations the destinations will see gaps in the sequence. It is not possible to use custom ``deliveryId``.
|
||||||
However, you can send a custom correlation identifier in the message to the destination. You must then retain
|
However, you can send a custom correlation identifier in the message to the destination. You must then retain
|
||||||
a mapping between the internal ``deliveryId`` (passed into the ``deliveryIdToMessage`` function) and your custom
|
a mapping between the internal ``deliveryId`` (passed into the ``deliveryIdToMessage`` function) and your custom
|
||||||
correlation id (passed into the message). You can do this by storing such mapping in a ``Map(correlationId -> deliveryId)``
|
correlation id (passed into the message). You can do this by storing such mapping in a ``Map(correlationId -> deliveryId)``
|
||||||
from which you can retrieve the ``deliveryId`` to be passed into the ``confirmDelivery`` method once the receiver
|
from which you can retrieve the ``deliveryId`` to be passed into the ``confirmDelivery`` method once the receiver
|
||||||
of your message has replied with your custom correlation id.
|
of your message has replied with your custom correlation id.
|
||||||
|
|
||||||
|
|
@ -643,7 +645,7 @@ if no matching ``confirmDelivery`` was performed.
|
||||||
Support for snapshots is provided by ``getDeliverySnapshot`` and ``setDeliverySnapshot``.
|
Support for snapshots is provided by ``getDeliverySnapshot`` and ``setDeliverySnapshot``.
|
||||||
The ``AtLeastOnceDeliverySnapshot`` contains the full delivery state, including unconfirmed messages.
|
The ``AtLeastOnceDeliverySnapshot`` contains the full delivery state, including unconfirmed messages.
|
||||||
If you need a custom snapshot for other parts of the actor state you must also include the
|
If you need a custom snapshot for other parts of the actor state you must also include the
|
||||||
``AtLeastOnceDeliverySnapshot``. It is serialized using protobuf with the ordinary Akka
|
``AtLeastOnceDeliverySnapshot``. It is serialized using protobuf with the ordinary Akka
|
||||||
serialization mechanism. It is easiest to include the bytes of the ``AtLeastOnceDeliverySnapshot``
|
serialization mechanism. It is easiest to include the bytes of the ``AtLeastOnceDeliverySnapshot``
|
||||||
as a blob in your custom snapshot.
|
as a blob in your custom snapshot.
|
||||||
|
|
||||||
|
|
@ -729,10 +731,10 @@ persistence extension will use "default" journal and snapshot-store plugins conf
|
||||||
akka.persistence.snapshot-store.plugin = ""
|
akka.persistence.snapshot-store.plugin = ""
|
||||||
|
|
||||||
However, these entries are provided as empty "", and require explicit user configuration via override in the user ``application.conf``.
|
However, these entries are provided as empty "", and require explicit user configuration via override in the user ``application.conf``.
|
||||||
For an example of journal plugin which writes messages to LevelDB see :ref:`local-leveldb-journal-java`.
|
For an example of journal plugin which writes messages to LevelDB see :ref:`local-leveldb-journal-java`.
|
||||||
For an example of snapshot store plugin which writes snapshots as individual files to the local filesystem see :ref:`local-snapshot-store-java`.
|
For an example of snapshot store plugin which writes snapshots as individual files to the local filesystem see :ref:`local-snapshot-store-java`.
|
||||||
|
|
||||||
Applications can provide their own plugins by implementing a plugin API and activate them by configuration.
|
Applications can provide their own plugins by implementing a plugin API and activate them by configuration.
|
||||||
Plugin development requires the following imports:
|
Plugin development requires the following imports:
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#plugin-imports
|
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#plugin-imports
|
||||||
|
|
@ -765,10 +767,10 @@ used for the plugin actor. If not specified, it defaults to ``akka.persistence.d
|
||||||
|
|
||||||
The journal plugin instance is an actor so the methods corresponding to requests from persistent actors
|
The journal plugin instance is an actor so the methods corresponding to requests from persistent actors
|
||||||
are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other
|
are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other
|
||||||
actors to achive parallelism.
|
actors to achive parallelism.
|
||||||
|
|
||||||
The journal plugin class must have a constructor without parameters or constructor with one ``com.typesafe.config.Config``
|
The journal plugin class must have a constructor without parameters or constructor with one ``com.typesafe.config.Config``
|
||||||
parameter. The plugin section of the actor system's config will be passed in the config constructor parameter.
|
parameter. The plugin section of the actor system's config will be passed in the config constructor parameter.
|
||||||
|
|
||||||
Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks.
|
Don't run journal tasks/futures on the system default dispatcher, since that might starve other tasks.
|
||||||
|
|
||||||
|
|
@ -842,7 +844,7 @@ instance. Enable this plugin by defining config property:
|
||||||
|
|
||||||
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#leveldb-plugin-config
|
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#leveldb-plugin-config
|
||||||
|
|
||||||
LevelDB based plugins will also require the following additional dependency declaration::
|
LevelDB based plugins will also require the following additional dependency declaration::
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.iq80.leveldb</groupId>
|
<groupId>org.iq80.leveldb</groupId>
|
||||||
|
|
@ -906,7 +908,7 @@ Local snapshot store
|
||||||
Local snapshot store plugin config entry is ``akka.persistence.snapshot-store.local`` and it writes snapshot files to
|
Local snapshot store plugin config entry is ``akka.persistence.snapshot-store.local`` and it writes snapshot files to
|
||||||
the local filesystem. Enable this plugin by defining config property:
|
the local filesystem. Enable this plugin by defining config property:
|
||||||
|
|
||||||
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#leveldb-snapshot-plugin-config
|
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#leveldb-snapshot-plugin-config
|
||||||
|
|
||||||
The default storage location is a directory named ``snapshots`` in the current working
|
The default storage location is a directory named ``snapshots`` in the current working
|
||||||
directory. This can be changed by configuration where the specified path can be relative or absolute:
|
directory. This can be changed by configuration where the specified path can be relative or absolute:
|
||||||
|
|
@ -973,7 +975,7 @@ Note that in this case actor or view overrides only ``persistenceId`` method:
|
||||||
|
|
||||||
.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#default-plugins
|
.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#default-plugins
|
||||||
|
|
||||||
When persistent actor or view overrides ``journalPluginId`` and ``snapshotPluginId`` methods,
|
When persistent actor or view overrides ``journalPluginId`` and ``snapshotPluginId`` methods,
|
||||||
the actor or view will be serviced by these specific persistence plugins instead of the defaults:
|
the actor or view will be serviced by these specific persistence plugins instead of the defaults:
|
||||||
|
|
||||||
.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#override-plugins
|
.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#override-plugins
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ Akka persistence is a separate jar file. Make sure that you have the following d
|
||||||
|
|
||||||
"com.typesafe.akka" %% "akka-persistence" % "@version@" @crossString@
|
"com.typesafe.akka" %% "akka-persistence" % "@version@" @crossString@
|
||||||
|
|
||||||
Akka persistence extension comes with few built-in persistence plugins, including
|
Akka persistence extension comes with few built-in persistence plugins, including
|
||||||
in-memory heap based journal, local file-system based snapshot-store and LevelDB based journal.
|
in-memory heap based journal, local file-system based snapshot-store and LevelDB based journal.
|
||||||
|
|
||||||
LevelDB based plugins will require the following additional dependency declaration::
|
LevelDB based plugins will require the following additional dependency declaration::
|
||||||
|
|
@ -50,12 +50,12 @@ Architecture
|
||||||
case of sender and receiver JVM crashes.
|
case of sender and receiver JVM crashes.
|
||||||
|
|
||||||
* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages
|
* *AsyncWriteJournal*: A journal stores the sequence of messages sent to a persistent actor. An application can control which messages
|
||||||
are journaled and which are received by the persistent actor without being journaled. The storage backend of a journal is pluggable.
|
are journaled and which are received by the persistent actor without being journaled. Journal maintains *highestSequenceNr* that is increased on each message.
|
||||||
Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem,
|
The storage backend of a journal is pluggable. Persistence extension comes with a "leveldb" journal plugin, which writes to the local filesystem,
|
||||||
and replicated journals are available as `Community plugins`_.
|
and replicated journals are available as `Community plugins`_.
|
||||||
|
|
||||||
* *Snapshot store*: A snapshot store persists snapshots of a persistent actor's or a view's internal state. Snapshots are
|
* *Snapshot store*: A snapshot store persists snapshots of a persistent actor's or a view's internal state. Snapshots are
|
||||||
used for optimizing recovery times. The storage backend of a snapshot store is pluggable.
|
used for optimizing recovery times. The storage backend of a snapshot store is pluggable.
|
||||||
Persistence extension comes with a "local" snapshot storage plugin, which writes to the local filesystem,
|
Persistence extension comes with a "local" snapshot storage plugin, which writes to the local filesystem,
|
||||||
and replicated snapshot stores are available as `Community plugins`_.
|
and replicated snapshot stores are available as `Community plugins`_.
|
||||||
|
|
||||||
|
|
@ -107,7 +107,7 @@ is completed. You should be careful to not send more messages to a persistent ac
|
||||||
otherwise the number of stashed messages will grow. It can be wise to protect against `OutOfMemoryError`
|
otherwise the number of stashed messages will grow. It can be wise to protect against `OutOfMemoryError`
|
||||||
by defining a maximum stash capacity in the mailbox configuration::
|
by defining a maximum stash capacity in the mailbox configuration::
|
||||||
|
|
||||||
akka.actor.default-mailbox.stash-capacity=10000
|
akka.actor.default-mailbox.stash-capacity=10000
|
||||||
|
|
||||||
If the stash capacity is exceeded for an actor the stashed messages are discarded and a
|
If the stash capacity is exceeded for an actor the stashed messages are discarded and a
|
||||||
``MessageQueueAppendFailedException`` is thrown, causing actor restart if default supervision
|
``MessageQueueAppendFailedException`` is thrown, causing actor restart if default supervision
|
||||||
|
|
@ -119,7 +119,7 @@ don't consume too much memory.
|
||||||
|
|
||||||
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default)
|
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default)
|
||||||
and the actor will unconditionally be stopped. If persistence of an event is rejected before it is
|
and the actor will unconditionally be stopped. If persistence of an event is rejected before it is
|
||||||
stored, e.g. due to serialization error, ``onPersistRejected`` will be invoked (logging a warning
|
stored, e.g. due to serialization error, ``onPersistRejected`` will be invoked (logging a warning
|
||||||
by default) and the actor continues with next message.
|
by default) and the actor continues with next message.
|
||||||
|
|
||||||
The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
||||||
|
|
@ -149,7 +149,7 @@ Recovery
|
||||||
--------
|
--------
|
||||||
|
|
||||||
By default, a persistent actor is automatically recovered on start and on restart by replaying journaled messages.
|
By default, a persistent actor is automatically recovered on start and on restart by replaying journaled messages.
|
||||||
New messages sent to a persistent actor during recovery do not interfere with replayed messages.
|
New messages sent to a persistent actor during recovery do not interfere with replayed messages.
|
||||||
They are cached and received by a persistent actor after recovery phase completes.
|
They are cached and received by a persistent actor after recovery phase completes.
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
|
|
@ -185,7 +185,7 @@ and before any other received messages.
|
||||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed
|
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-completed
|
||||||
|
|
||||||
If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
|
If there is a problem with recovering the state of the actor from the journal, ``onRecoveryFailure``
|
||||||
is called (logging the error by default) and the actor will be stopped.
|
is called (logging the error by default) and the actor will be stopped.
|
||||||
|
|
||||||
.. _persist-async-scala:
|
.. _persist-async-scala:
|
||||||
|
|
||||||
|
|
@ -210,7 +210,7 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e
|
||||||
.. note::
|
.. note::
|
||||||
In order to implement the pattern known as "*command sourcing*" simply call ``persistAsync(cmd)(...)`` right away on all incoming
|
In order to implement the pattern known as "*command sourcing*" simply call ``persistAsync(cmd)(...)`` right away on all incoming
|
||||||
messages, and handle them in the callback.
|
messages, and handle them in the callback.
|
||||||
|
|
||||||
.. warning::
|
.. warning::
|
||||||
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||||
``persistAsync`` and the journal has confirmed the write.
|
``persistAsync`` and the journal has confirmed the write.
|
||||||
|
|
@ -282,17 +282,17 @@ Failures
|
||||||
--------
|
--------
|
||||||
|
|
||||||
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default)
|
If persistence of an event fails, ``onPersistFailure`` will be invoked (logging the error by default)
|
||||||
and the actor will unconditionally be stopped.
|
and the actor will unconditionally be stopped.
|
||||||
|
|
||||||
The reason that it cannot resume when persist fails is that it is unknown if the even was actually
|
The reason that it cannot resume when persist fails is that it is unknown if the even was actually
|
||||||
persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures
|
persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures
|
||||||
will most likely fail anyway, since the journal is probably unavailable. It is better to stop the
|
will most likely fail anyway, since the journal is probably unavailable. It is better to stop the
|
||||||
actor and after a back-off timeout start it again. The ``akka.pattern.BackoffSupervisor`` actor
|
actor and after a back-off timeout start it again. The ``akka.pattern.BackoffSupervisor`` actor
|
||||||
is provided to support such restarts.
|
is provided to support such restarts.
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#backoff
|
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#backoff
|
||||||
|
|
||||||
If persistence of an event is rejected before it is stored, e.g. due to serialization error,
|
If persistence of an event is rejected before it is stored, e.g. due to serialization error,
|
||||||
``onPersistRejected`` will be invoked (logging a warning by default) and the actor continues with
|
``onPersistRejected`` will be invoked (logging a warning by default) and the actor continues with
|
||||||
next message.
|
next message.
|
||||||
|
|
||||||
|
|
@ -304,7 +304,7 @@ Atomic writes
|
||||||
|
|
||||||
Each event is of course stored atomically, but it is also possible to store several events atomically by
|
Each event is of course stored atomically, but it is also possible to store several events atomically by
|
||||||
using the ``persistAll`` or ``persistAllAsync`` method. That means that all events passed to that method
|
using the ``persistAll`` or ``persistAllAsync`` method. That means that all events passed to that method
|
||||||
are stored or none of them are stored if there is an error.
|
are stored or none of them are stored if there is an error.
|
||||||
|
|
||||||
The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by
|
The recovery of a persistent actor will therefore never be done partially with only a subset of events persisted by
|
||||||
`persistAll`.
|
`persistAll`.
|
||||||
|
|
@ -338,9 +338,11 @@ Deleting messages in event sourcing based applications is typically either not u
|
||||||
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
|
up until the sequence number of the data held by that snapshot can be issued, to safely delete the previous events,
|
||||||
while still having access to the accumulated state during replays - by loading the snapshot.
|
while still having access to the accumulated state during replays - by loading the snapshot.
|
||||||
|
|
||||||
The result of the ``deleteMessages`` request is signaled to the persistent actor with a ``DeleteMessagesSuccess``
|
The result of the ``deleteMessages`` request is signaled to the persistent actor with a ``DeleteMessagesSuccess``
|
||||||
message if the delete was successful or a ``DeleteMessagesFailure`` message if it failed.
|
message if the delete was successful or a ``DeleteMessagesFailure`` message if it failed.
|
||||||
|
|
||||||
|
Message deletion doesn't affect highest sequence number of journal, even if all messages were deleted from journal after ``deleteMessages`` invocation.
|
||||||
|
|
||||||
Persistence status handling
|
Persistence status handling
|
||||||
---------------------------
|
---------------------------
|
||||||
Persisting, deleting and replaying messages can either succeed or fail.
|
Persisting, deleting and replaying messages can either succeed or fail.
|
||||||
|
|
@ -415,13 +417,13 @@ Persistent Views
|
||||||
``PersistentView`` is deprecated. Use :ref:`persistence-query-scala` instead. The corresponding
|
``PersistentView`` is deprecated. Use :ref:`persistence-query-scala` instead. The corresponding
|
||||||
query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source``
|
query type is ``EventsByPersistenceId``. There are several alternatives for connecting the ``Source``
|
||||||
to an actor corresponding to a previous ``PersistentView`` actor:
|
to an actor corresponding to a previous ``PersistentView`` actor:
|
||||||
|
|
||||||
* `Sink.actorRef`_ is simple, but has the disadvantage that there is no back-pressure signal from the
|
* `Sink.actorRef`_ is simple, but has the disadvantage that there is no back-pressure signal from the
|
||||||
destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow
|
destination actor, i.e. if the actor is not consuming the messages fast enough the mailbox of the actor will grow
|
||||||
* `mapAsync`_ combined with :ref:`actors-ask-lambda` is almost as simple with the advantage of back-pressure
|
* `mapAsync`_ combined with :ref:`actors-ask-lambda` is almost as simple with the advantage of back-pressure
|
||||||
being propagated all the way
|
being propagated all the way
|
||||||
* `ActorSubscriber`_ in case you need more fine grained control
|
* `ActorSubscriber`_ in case you need more fine grained control
|
||||||
|
|
||||||
The consuming actor may be a plain ``Actor`` or a ``PersistentActor`` if it needs to store its
|
The consuming actor may be a plain ``Actor`` or a ``PersistentActor`` if it needs to store its
|
||||||
own state (e.g. fromSequenceNr offset).
|
own state (e.g. fromSequenceNr offset).
|
||||||
|
|
||||||
|
|
@ -529,7 +531,7 @@ saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay a
|
||||||
Since it is acceptable for some applications to not use any snapshotting, it is legal to not configure a snapshot store,
|
Since it is acceptable for some applications to not use any snapshotting, it is legal to not configure a snapshot store,
|
||||||
however Akka will log a warning message when this situation is detected and then continue to operate until
|
however Akka will log a warning message when this situation is detected and then continue to operate until
|
||||||
an actor tries to store a snapshot, at which point the the operation will fail (by replying with an ``SaveSnapshotFailure`` for example).
|
an actor tries to store a snapshot, at which point the the operation will fail (by replying with an ``SaveSnapshotFailure`` for example).
|
||||||
|
|
||||||
Note that :ref:`cluster_sharding_scala` is using snapshots, so if you use Cluster Sharding you need to define a snapshot store plugin.
|
Note that :ref:`cluster_sharding_scala` is using snapshots, so if you use Cluster Sharding you need to define a snapshot store plugin.
|
||||||
|
|
||||||
Snapshot deletion
|
Snapshot deletion
|
||||||
|
|
@ -570,7 +572,7 @@ have not been confirmed within a configurable timeout.
|
||||||
|
|
||||||
The state of the sending actor, including which messages that have been sent and still not been
|
The state of the sending actor, including which messages that have been sent and still not been
|
||||||
confirmed by the recepient, must be persistent so that it can survive a crash of the sending actor
|
confirmed by the recepient, must be persistent so that it can survive a crash of the sending actor
|
||||||
or JVM. The ``AtLeastOnceDelivery`` trait does not persist anything by itself. It is your
|
or JVM. The ``AtLeastOnceDelivery`` trait does not persist anything by itself. It is your
|
||||||
responsibility to persist the intent that a message is sent and that a confirmation has been
|
responsibility to persist the intent that a message is sent and that a confirmation has been
|
||||||
received.
|
received.
|
||||||
|
|
||||||
|
|
@ -600,30 +602,30 @@ Relationship between deliver and confirmDelivery
|
||||||
------------------------------------------------
|
------------------------------------------------
|
||||||
|
|
||||||
To send messages to the destination path, use the ``deliver`` method after you have persisted the intent
|
To send messages to the destination path, use the ``deliver`` method after you have persisted the intent
|
||||||
to send the message.
|
to send the message.
|
||||||
|
|
||||||
The destination actor must send back a confirmation message. When the sending actor receives this
|
The destination actor must send back a confirmation message. When the sending actor receives this
|
||||||
confirmation message you should persist the fact that the message was delivered successfully and then call
|
confirmation message you should persist the fact that the message was delivered successfully and then call
|
||||||
the ``confirmDelivery`` method.
|
the ``confirmDelivery`` method.
|
||||||
|
|
||||||
If the persistent actor is not currently recovering, the ``deliver`` method will send the message to
|
If the persistent actor is not currently recovering, the ``deliver`` method will send the message to
|
||||||
the destination actor. When recovering, messages will be buffered until they have been confirmed using ``confirmDelivery``.
|
the destination actor. When recovering, messages will be buffered until they have been confirmed using ``confirmDelivery``.
|
||||||
Once recovery has completed, if there are outstanding messages that have not been confirmed (during the message replay),
|
Once recovery has completed, if there are outstanding messages that have not been confirmed (during the message replay),
|
||||||
the persistent actor will resend these before sending any other messages.
|
the persistent actor will resend these before sending any other messages.
|
||||||
|
|
||||||
Deliver requires a ``deliveryIdToMessage`` function to pass the provided ``deliveryId`` into the message so that correlation
|
Deliver requires a ``deliveryIdToMessage`` function to pass the provided ``deliveryId`` into the message so that correlation
|
||||||
between ``deliver`` and ``confirmDelivery`` is possible. The ``deliveryId`` must do the round trip. Upon receipt
|
between ``deliver`` and ``confirmDelivery`` is possible. The ``deliveryId`` must do the round trip. Upon receipt
|
||||||
of the message, destination actor will send the same``deliveryId`` wrapped in a confirmation message back to the sender.
|
of the message, destination actor will send the same``deliveryId`` wrapped in a confirmation message back to the sender.
|
||||||
The sender will then use it to call ``confirmDelivery`` method to complete delivery routine.
|
The sender will then use it to call ``confirmDelivery`` method to complete delivery routine.
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#at-least-once-example
|
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#at-least-once-example
|
||||||
|
|
||||||
The ``deliveryId`` generated by the persistence module is a strictly monotonically increasing sequence number
|
The ``deliveryId`` generated by the persistence module is a strictly monotonically increasing sequence number
|
||||||
without gaps. The same sequence is used for all destinations of the actor, i.e. when sending to multiple
|
without gaps. The same sequence is used for all destinations of the actor, i.e. when sending to multiple
|
||||||
destinations the destinations will see gaps in the sequence. It is not possible to use custom ``deliveryId``.
|
destinations the destinations will see gaps in the sequence. It is not possible to use custom ``deliveryId``.
|
||||||
However, you can send a custom correlation identifier in the message to the destination. You must then retain
|
However, you can send a custom correlation identifier in the message to the destination. You must then retain
|
||||||
a mapping between the internal ``deliveryId`` (passed into the ``deliveryIdToMessage`` function) and your custom
|
a mapping between the internal ``deliveryId`` (passed into the ``deliveryIdToMessage`` function) and your custom
|
||||||
correlation id (passed into the message). You can do this by storing such mapping in a ``Map(correlationId -> deliveryId)``
|
correlation id (passed into the message). You can do this by storing such mapping in a ``Map(correlationId -> deliveryId)``
|
||||||
from which you can retrieve the ``deliveryId`` to be passed into the ``confirmDelivery`` method once the receiver
|
from which you can retrieve the ``deliveryId`` to be passed into the ``confirmDelivery`` method once the receiver
|
||||||
of your message has replied with your custom correlation id.
|
of your message has replied with your custom correlation id.
|
||||||
|
|
||||||
|
|
@ -638,7 +640,7 @@ if no matching ``confirmDelivery`` was performed.
|
||||||
Support for snapshots is provided by ``getDeliverySnapshot`` and ``setDeliverySnapshot``.
|
Support for snapshots is provided by ``getDeliverySnapshot`` and ``setDeliverySnapshot``.
|
||||||
The ``AtLeastOnceDeliverySnapshot`` contains the full delivery state, including unconfirmed messages.
|
The ``AtLeastOnceDeliverySnapshot`` contains the full delivery state, including unconfirmed messages.
|
||||||
If you need a custom snapshot for other parts of the actor state you must also include the
|
If you need a custom snapshot for other parts of the actor state you must also include the
|
||||||
``AtLeastOnceDeliverySnapshot``. It is serialized using protobuf with the ordinary Akka
|
``AtLeastOnceDeliverySnapshot``. It is serialized using protobuf with the ordinary Akka
|
||||||
serialization mechanism. It is easiest to include the bytes of the ``AtLeastOnceDeliverySnapshot``
|
serialization mechanism. It is easiest to include the bytes of the ``AtLeastOnceDeliverySnapshot``
|
||||||
as a blob in your custom snapshot.
|
as a blob in your custom snapshot.
|
||||||
|
|
||||||
|
|
@ -774,7 +776,7 @@ Here is how everything is wired together:
|
||||||
Storage plugins
|
Storage plugins
|
||||||
===============
|
===============
|
||||||
|
|
||||||
Storage backends for journals and snapshot stores are pluggable in the Akka persistence extension.
|
Storage backends for journals and snapshot stores are pluggable in the Akka persistence extension.
|
||||||
|
|
||||||
Directory of persistence journal and snapshot store plugins is available at the Akka Community Projects page, see `Community plugins`_
|
Directory of persistence journal and snapshot store plugins is available at the Akka Community Projects page, see `Community plugins`_
|
||||||
|
|
||||||
|
|
@ -788,10 +790,10 @@ persistence extension will use "default" journal and snapshot-store plugins conf
|
||||||
akka.persistence.snapshot-store.plugin = ""
|
akka.persistence.snapshot-store.plugin = ""
|
||||||
|
|
||||||
However, these entries are provided as empty "", and require explicit user configuration via override in the user ``application.conf``.
|
However, these entries are provided as empty "", and require explicit user configuration via override in the user ``application.conf``.
|
||||||
For an example of journal plugin which writes messages to LevelDB see :ref:`local-leveldb-journal`.
|
For an example of journal plugin which writes messages to LevelDB see :ref:`local-leveldb-journal`.
|
||||||
For an example of snapshot store plugin which writes snapshots as individual files to the local filesystem see :ref:`local-snapshot-store`.
|
For an example of snapshot store plugin which writes snapshots as individual files to the local filesystem see :ref:`local-snapshot-store`.
|
||||||
|
|
||||||
Applications can provide their own plugins by implementing a plugin API and activate them by configuration.
|
Applications can provide their own plugins by implementing a plugin API and activate them by configuration.
|
||||||
Plugin development requires the following imports:
|
Plugin development requires the following imports:
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#plugin-imports
|
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#plugin-imports
|
||||||
|
|
@ -824,7 +826,7 @@ used for the plugin actor. If not specified, it defaults to ``akka.persistence.d
|
||||||
|
|
||||||
The journal plugin instance is an actor so the methods corresponding to requests from persistent actors
|
The journal plugin instance is an actor so the methods corresponding to requests from persistent actors
|
||||||
are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other
|
are executed sequentially. It may delegate to asynchronous libraries, spawn futures, or delegate to other
|
||||||
actors to achive parallelism.
|
actors to achive parallelism.
|
||||||
|
|
||||||
The journal plugin class must have a constructor without parameters or constructor with one ``com.typesafe.config.Config``
|
The journal plugin class must have a constructor without parameters or constructor with one ``com.typesafe.config.Config``
|
||||||
parameter. The plugin section of the actor system's config will be passed in the config constructor parameter.
|
parameter. The plugin section of the actor system's config will be passed in the config constructor parameter.
|
||||||
|
|
@ -955,7 +957,7 @@ Local snapshot store
|
||||||
Local snapshot store plugin config entry is ``akka.persistence.snapshot-store.local`` and it writes snapshot files to
|
Local snapshot store plugin config entry is ``akka.persistence.snapshot-store.local`` and it writes snapshot files to
|
||||||
the local filesystem. Enable this plugin by defining config property:
|
the local filesystem. Enable this plugin by defining config property:
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#leveldb-snapshot-plugin-config
|
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#leveldb-snapshot-plugin-config
|
||||||
|
|
||||||
The default storage location is a directory named ``snapshots`` in the current working
|
The default storage location is a directory named ``snapshots`` in the current working
|
||||||
directory. This can be changed by configuration where the specified path can be relative or absolute:
|
directory. This can be changed by configuration where the specified path can be relative or absolute:
|
||||||
|
|
@ -1024,7 +1026,7 @@ Note that in this case actor or view overrides only ``persistenceId`` method:
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#default-plugins
|
.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#default-plugins
|
||||||
|
|
||||||
When persistent actor or view overrides ``journalPluginId`` and ``snapshotPluginId`` methods,
|
When persistent actor or view overrides ``journalPluginId`` and ``snapshotPluginId`` methods,
|
||||||
the actor or view will be serviced by these specific persistence plugins instead of the defaults:
|
the actor or view will be serviced by these specific persistence plugins instead of the defaults:
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#override-plugins
|
.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#override-plugins
|
||||||
|
|
|
||||||
|
|
@ -157,6 +157,31 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
|
||||||
receiverProbe2.expectNoMsg(200.millis)
|
receiverProbe2.expectNoMsg(200.millis)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"not reset highestSequenceNr after message deletion" in {
|
||||||
|
journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
|
||||||
|
1 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||||
|
receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
|
||||||
|
|
||||||
|
journal ! DeleteMessagesTo(pid, 3L, receiverProbe.ref)
|
||||||
|
receiverProbe.expectMsg(DeleteMessagesSuccess(3L))
|
||||||
|
|
||||||
|
journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
|
||||||
|
4 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||||
|
receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
|
||||||
|
}
|
||||||
|
|
||||||
|
"not reset highestSequenceNr after journal cleanup" in {
|
||||||
|
journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
|
||||||
|
1 to 5 foreach { i ⇒ receiverProbe.expectMsg(replayedMessage(i)) }
|
||||||
|
receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
|
||||||
|
|
||||||
|
journal ! DeleteMessagesTo(pid, Long.MaxValue, receiverProbe.ref)
|
||||||
|
receiverProbe.expectMsg(DeleteMessagesSuccess(Long.MaxValue))
|
||||||
|
|
||||||
|
journal ! ReplayMessages(0, Long.MaxValue, Long.MaxValue, pid, receiverProbe.ref)
|
||||||
|
receiverProbe.expectMsg(RecoverySuccess(highestSequenceNr = 5L))
|
||||||
|
}
|
||||||
|
|
||||||
"reject non-serializable events" in {
|
"reject non-serializable events" in {
|
||||||
// there is no chance that a journal could create a data representation for type of event
|
// there is no chance that a journal could create a data representation for type of event
|
||||||
val notSerializableEvent = new Object { override def toString = "not serializable" }
|
val notSerializableEvent = new Object { override def toString = "not serializable" }
|
||||||
|
|
|
||||||
|
|
@ -51,6 +51,7 @@ trait AsyncRecovery {
|
||||||
* number after recovery as the starting point when persisting new events.
|
* number after recovery as the starting point when persisting new events.
|
||||||
* This sequence number is also used as `toSequenceNr` in subsequent call
|
* This sequence number is also used as `toSequenceNr` in subsequent call
|
||||||
* to [[#asyncReplayMessages]] unless the user has specified a lower `toSequenceNr`.
|
* to [[#asyncReplayMessages]] unless the user has specified a lower `toSequenceNr`.
|
||||||
|
* Journal must maintain the highest sequence number and never decrease it.
|
||||||
*
|
*
|
||||||
* This call is protected with a circuit-breaker.
|
* This call is protected with a circuit-breaker.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -215,6 +215,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
||||||
* (inclusive).
|
* (inclusive).
|
||||||
*
|
*
|
||||||
* This call is protected with a circuit-breaker.
|
* This call is protected with a circuit-breaker.
|
||||||
|
* Message deletion doesn't affect the highest sequence number of messages, journal must maintain the highest sequence number and never decrease it.
|
||||||
*/
|
*/
|
||||||
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
|
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit]
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue