Merge pull request #2056 from akka/wip-persistent-java8-∂π

Wip persistent java8 ∂π
This commit is contained in:
Björn Antonsson 2014-03-05 11:11:59 +01:00
commit 3a98b2cd79
16 changed files with 1862 additions and 0 deletions

View file

@ -15,3 +15,4 @@ Actors
testing
lambda-actors
lambda-fsm
lambda-persistence

View file

@ -0,0 +1,657 @@
.. _persistence-lambda-java:
######################################
Persistence (Java with Lambda Support)
######################################
Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor
is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept behind Akka
persistence is that only changes to an actor's internal state are persisted but never its current state directly
(except for optional snapshots). These changes are only ever appended to storage, nothing is ever mutated, which
allows for very high transaction rates and efficient replication. Stateful actors are recovered by replaying stored
changes to these actors from which they can rebuild internal state. This can be either the full history of changes
or starting from a snapshot which can dramatically reduce recovery times. Akka persistence also provides point-to-point
communication channels with at-least-once message delivery semantics.
.. warning::
This module is marked as **“experimental”** as of its introduction in Akka 2.3.0. We will continue to
improve this API based on our users feedback, which implies that while we try to keep incompatible
changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the
contents of the ``akka.persistence`` package.
Akka persistence is inspired by the `eventsourced`_ library. It follows the same concepts and architecture of
`eventsourced`_ but significantly differs on API and implementation level.
.. _eventsourced: https://github.com/eligosource/eventsourced
Dependencies
============
Akka persistence is a separate jar file. Make sure that you have the following dependency in your project::
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence-experimental_@binVersion@</artifactId>
<version>@version@</version>
</dependency>
Architecture
============
* *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal
before its ``receive`` method is called. When a processor is started or restarted, journaled messages are replayed
to that processor, so that it can recover internal state from these messages.
* *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another
processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's
replicated message stream.
* *Channel*: Channels are used by processors and views to communicate with other actors. They prevent that replayed
messages are redundantly delivered to these actors and provide at-least-once message delivery semantics, also in
case of sender and receiver JVM crashes.
* *Journal*: A journal stores the sequence of messages sent to a processor. An application can control which messages
are journaled and which are received by the processor without being journaled. The storage backend of a journal is
pluggable. The default journal storage plugin writes to the local filesystem, replicated journals are available as
`Community plugins`_.
* *Snapshot store*: A snapshot store persists snapshots of a processor's or a view's internal state. Snapshots are
used for optimizing recovery times. The storage backend of a snapshot store is pluggable. The default snapshot
storage plugin writes to the local filesystem.
* *Event sourcing*. Based on the building blocks described above, Akka persistence provides abstractions for the
development of event sourced applications (see section :ref:`event-sourcing-java-lambda`)
.. _Community plugins: https://gist.github.com/krasserm/8612920#file-akka-persistence-plugins-md
.. _processors-lambda-java:
Processors
==========
A processor can be implemented by extending ``AbstractProcessor`` class and implementing the
``receive`` method.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#definition
Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted.
When a processor's ``receive`` method is called with a ``Persistent`` message it can safely assume that this message
has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor
is stopped, by default. If a processor should continue running on persistence failures it must handle
``PersistenceFailure`` messages. In this case, a processor may want to inform the sender about the failure,
so that the sender can re-send the message, if needed.
An ``AbstractProcessor`` itself is an ``Actor`` and can therefore be instantiated with ``actorOf``.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#usage
Recovery
--------
By default, a processor is automatically recovered on start and on restart by replaying journaled messages.
New messages sent to a processor during recovery do not interfere with replayed messages. New messages will
only be received by a processor after recovery completes.
Recovery customization
^^^^^^^^^^^^^^^^^^^^^^
Automated recovery on start can be disabled by overriding ``preStart`` with an empty implementation.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-disabled
In this case, a processor must be recovered explicitly by sending it a ``Recover`` message.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-explicit
If not overridden, ``preStart`` sends a ``Recover`` message to ``self()``. Applications may also override
``preStart`` to define further ``Recover`` parameters such as an upper sequence number bound, for example.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-custom
Upper sequence number bounds can be used to recover a processor to past state instead of current state. Automated
recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-restart-disabled
Recovery status
^^^^^^^^^^^^^^^
A processor can query its own recovery status via the methods
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-status
.. _failure-handling-java-lambda:
Failure handling
^^^^^^^^^^^^^^^^
A persistent message that caused an exception will be received again by a processor after restart. To prevent
a replay of that message during recovery it can be deleted.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#deletion
Message deletion
----------------
A processor can delete a single message by calling the ``deleteMessage`` method with the sequence number of
that message as argument. An optional ``permanent`` parameter specifies whether the message shall be permanently
deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions
to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging
purposes, for example. To delete all messages (journaled by a single processor) up to a specified sequence number,
processors should call the ``deleteMessages`` method.
Identifiers
-----------
A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the
``String`` representation of processor's path without the address part and can be obtained via the ``processorId``
method.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id
Applications can customize a processor's id by specifying an actor name during processor creation as shown in
section :ref:`processors-java`. This changes that processor's name in its actor hierarchy and hence influences only
part of the processor id. To fully customize a processor's id, the ``processorId`` method must be overridden.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id-override
Overriding ``processorId`` is the recommended way to generate stable identifiers.
.. _views-java-lambda:
Views
=====
Views can be implemented by extending the ``AbstractView`` abstract class and implementing the ``receive`` and the
``processorId``
methods.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#view
The ``processorId`` identifies the processor from which the view receives journaled messages. It is not necessary
the referenced processor is actually running. Views read messages from a processor's journal directly. When a
processor is started later and begins to write new messages, the corresponding view is updated automatically, by
default.
Updates
-------
The default update interval of all views of an actor system is configurable:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval
``View`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update
interval for a specific view class or view instance. Applications may also trigger additional updates at
any time by sending a view an ``Update`` message.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#view-update
If the ``await`` parameter is set to ``true``, messages that follow the ``Update`` request are processed when the
incremental message replay, triggered by that update request, completed. If set to ``false`` (default), messages
following the update request may interleave with the replayed message stream. Automated updates always run with
``await = false``.
Automated updates of all views of an actor system can be turned off by configuration:
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update
Implementation classes may override the configured default value by overriding the ``autoUpdate`` method. To
limit the number of replayed messages per update request, applications can configure a custom
``akka.persistence.view.auto-update-replay-max`` value or override the ``autoUpdateReplayMax`` method. The number
of replayed messages for manual updates can be limited with the ``replayMax`` parameter of the ``Update`` message.
Recovery
--------
Initial recovery of views works in the very same way as for :ref:`processors` (i.e. by sending a ``Recover`` message
to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``.
Further possibilities to customize initial recovery are explained in section :ref:`processors-java`.
Identifiers
-----------
A view must have an identifier that doesn't change across different actor incarnations. It defaults to the
``String`` representation of the actor path without the address part and can be obtained via the ``viewId``
method.
Applications can customize a view's id by specifying an actor name during view creation. This changes that view's
name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the
``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers.
The ``viewId`` must differ from the referenced ``processorId``, unless :ref:`snapshots-java` of a view and its
processor shall be shared (which is what applications usually do not want).
.. _channels-java-lambda:
Channels
========
Channels are special actors that are used by processors or views to communicate with other actors (channel
destinations). The following discusses channels in context of processors but this is also applicable to views.
Channels prevent redundant delivery of replayed messages to destinations during processor recovery. A replayed
message is retained by a channel if its delivery has been confirmed by a destination.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-example
A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver``
request instructs a channel to send a ``Persistent`` message to a destination. A destination is provided as
``ActorPath`` and messages are sent by the channel via that path's ``ActorSelection``. Sender references are
preserved by a channel, therefore, a destination can reply to the sender of a ``Deliver`` request.
If a processor wants to reply to a ``Persistent`` message sender it should use the ``sender()`` path as
channel destination.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-example-reply
Persistent messages delivered by a channel are of type ``ConfirmablePersistent``. ``ConfirmablePersistent`` extends
``Persistent`` by adding the methods ``confirm`` and ``redeliveries`` (see also :ref:`redelivery-java-lambda`). A
channel destination confirms the delivery of a ``ConfirmablePersistent`` message by calling ``confirm()`` on that
message. This asynchronously writes a confirmation entry to the journal. Replayed messages internally contain
confirmation entries which allows a channel to decide if it should retain these messages or not.
A ``Processor`` can also be used as channel destination i.e. it can persist ``ConfirmablePersistent`` messages too.
.. _redelivery-java-lambda:
Message re-delivery
-------------------
Channels re-deliver messages to destinations if they do not confirm delivery within a configurable timeout.
This timeout can be specified as ``redeliverInterval`` when creating a channel, optionally together with the
maximum number of re-deliveries a channel should attempt for each unconfirmed message. The number of re-delivery
attempts can be obtained via the ``redeliveries`` method on ``ConfirmablePersistent``.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-custom-settings
A channel keeps messages in memory until their successful delivery has been confirmed or the maximum number of
re-deliveries is reached. To be notified about messages that have reached the maximum number of re-deliveries,
applications can register a listener at channel creation.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-custom-listener
A listener receives ``RedeliverFailure`` notifications containing all messages that could not be delivered. On
receiving a ``RedeliverFailure`` message, an application may decide to restart the sending processor to enforce
a re-send of these messages to the channel or confirm these messages to prevent further re-sends. The sending
processor can also be restarted any time later to re-send unconfirmed messages.
This combination of
* message persistence by sending processors
* message replays by sending processors
* message re-deliveries by channels and
* application-level confirmations (acknowledgements) by destinations
enables channels to provide at-least-once message delivery semantics. Possible duplicates can be detected by
destinations by tracking message sequence numbers. Message sequence numbers are generated per sending processor.
Depending on how a processor routes outbound messages to destinations, they may either see a contiguous message
sequence or a sequence with gaps.
.. warning::
If a processor emits more than one outbound message per inbound ``Persistent`` message it **must** use a
separate channel for each outbound message to ensure that confirmations are uniquely identifiable, otherwise,
at-least-once message delivery semantics do not apply. This rule has been introduced to avoid writing additional
outbound message identifiers to the journal which would decrease the overall throughput. It is furthermore
recommended to collapse multiple outbound messages to the same destination into a single outbound message,
otherwise, if sent via multiple channels, their ordering is not defined.
If an application wants to have more control how sequence numbers are assigned to messages it should use an
application-specific sequence number generator and include the generated sequence numbers into the ``payload``
of ``Persistent`` messages.
Persistent channels
-------------------
Channels created with ``Channel.props`` do not persist messages. These channels are usually used in combination
with a sending processor that takes care of persistence, hence, channel-specific persistence is not necessary in
this case. They are referred to as transient channels in the following.
Persistent channels are like transient channels but additionally persist messages before delivering them. Messages
that have been persisted by a persistent channel are deleted when destinations confirm their delivery. A persistent
channel can be created with ``PersistentChannel.props`` and configured with a ``PersistentChannelSettings`` object.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-example
A persistent channel is useful for delivery of messages to slow destinations or destinations that are unavailable
for a long time. It can constrain the number of pending confirmations based on the ``pendingConfirmationsMax``
and ``pendingConfirmationsMin`` parameters of ``PersistentChannelSettings``.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-watermarks
It suspends delivery when the number of pending confirmations reaches ``pendingConfirmationsMax`` and resumes
delivery again when this number falls below ``pendingConfirmationsMin``. This prevents both, flooding destinations
with more messages than they can process and unlimited memory consumption by the channel. A persistent channel
continues to persist new messages even when message delivery is temporarily suspended.
Standalone usage
----------------
Applications may also use channels standalone. Transient channels can be used standalone if re-delivery attempts
to destinations are required but message loss in case of a sender JVM crash is not an issue. If message loss in
case of a sender JVM crash is an issue, persistent channels should be used. In this case, applications may want to
receive replies from the channel whether messages have been successfully persisted or not. This can be enabled by
creating the channel with the ``replyPersistent`` configuration parameter set to ``true``:
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-reply
With this setting, either the successfully persisted message is replied to the sender or a ``PersistenceFailure``
message. In case the latter case, the sender should re-send the message.
Identifiers
-----------
In the same way as :ref:`processors-java` and :ref:`views-java`, channels also have an identifier that defaults to a channel's
path. A channel identifier can therefore be customized by using a custom actor name at channel creation. This changes
that channel's name in its actor hierarchy and hence influences only part of the channel identifier. To fully customize
a channel identifier, it should be provided as argument ``Channel.props(String)`` or ``PersistentChannel.props(String)``
(recommended to generate stable identifiers).
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-id-override
Persistent messages
===================
Payload
-------
The payload of a ``Persistent`` message can be obtained via its ``payload`` method. Inside processors, new messages
must be derived from the current persistent message before sending them via a channel, either by calling ``p.withPayload(...)``
or ``Persistent.create(..., getCurrentPersistentMessage())`` where ``getCurrentPersistentMessage()`` is defined on
``AbstractProcessor``.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#current-message
This is necessary for delivery confirmations to work properly. Both
ways are equivalent but we recommend using ``p.withPayload(...)`` for clarity. It is not allowed to send a message
via a channel that has been created with ``Persistent.create(...)``. This would redeliver the message on every replay
even though its delivery was confirmed by a destination.
Sequence number
---------------
The sequence number of a ``Persistent`` message can be obtained via its ``sequenceNr`` method. Persistent
messages are assigned sequence numbers on a per-processor basis (or per channel basis if used
standalone). A sequence starts at ``1L`` and doesn't contain gaps unless a processor deletes messages.
.. _snapshots-java-lambda:
Snapshots
=========
Snapshots can dramatically reduce recovery times of processors and views. The following discusses snapshots
in context of processors but this is also applicable to views.
Processors can save snapshots of internal state by calling the ``saveSnapshot`` method. If saving of a snapshot
succeeds, the processor receives a ``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#save-snapshot
During recovery, the processor is offered a previously saved snapshot via a ``SnapshotOffer`` message from
which it can initialize internal state.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-offer
The replayed messages that follow the ``SnapshotOffer`` message, if any, are younger than the offered snapshot.
They finally recover the processor to its current (i.e. latest) state.
In general, a processor is only offered a snapshot if that processor has previously saved one or more snapshots
and at least one of these snapshots matches the ``SnapshotSelectionCriteria`` that can be specified for recovery.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-criteria
If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which selects the latest (= youngest) snapshot.
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
Snapshot deletion
-----------------
A processor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number and the
timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, processors should
use the ``deleteSnapshots`` method.
.. _event-sourcing-java-lambda:
Event sourcing
==============
In all the examples so far, messages that change a processor's state have been sent as ``Persistent`` messages
by an application, so that they can be replayed during recovery. From this point of view, the journal acts as
a write-ahead-log for whatever ``Persistent`` messages a processor receives. This is also known as *command
sourcing*. Commands, however, may fail and some applications cannot tolerate command failures during recovery.
For these applications `Event Sourcing`_ is a better choice. Applied to Akka persistence, the basic idea behind
event sourcing is quite simple. A processor receives a (non-persistent) command which is first validated if it
can be applied to the current state. Here, validation can mean anything, from simple inspection of a command
message's fields up to a conversation with several external services, for example. If validation succeeds, events
are generated from the command, representing the effect of the command. These events are then persisted and, after
successful persistence, used to change a processor's state. When the processor needs to be recovered, only the
persisted events are replayed of which we know that they can be successfully applied. In other words, events
cannot fail when being replayed to a processor, in contrast to commands. Eventsourced processors may of course
also process commands that do not change application state, such as query commands, for example.
.. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html
Akka persistence supports event sourcing with the ``AbstractEventsourcedProcessor`` abstract class (which implements
event sourcing as a pattern on top of command sourcing). A processor that extends this abstract class does not handle
``Persistent`` messages directly but uses the ``persist`` method to persist and handle events. The behavior of an
``AbstractEventsourcedProcessor`` is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is
demonstrated in the following example.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/EventsourcedExample.java#eventsourced-example
The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The
``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``.
The processor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt``
and ``SnapshotOffer`` messages. The processor's ``receiveCommand`` method is a command handler. In this example,
a command is handled by generating two events which are then persisted and handled. Events are persisted by calling
``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument.
The ``persist`` method persists events asynchronously and the event handler is executed for successfully persisted
events. Successfully persisted events are internally sent back to the processor as individual messages that trigger
event handler executions. An event handler may close over processor state and mutate it. The sender of a persisted
event is the sender of the corresponding command. This allows event handlers to reply to the sender of a command
(not shown).
The main responsibility of an event handler is changing processor state using event data and notifying others
about successful state changes by publishing events.
When persisting events with ``persist`` it is guaranteed that the processor will not receive further commands between
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
calls in context of a single command. The example also shows how to switch between command different command handlers
with ``context().become()`` and ``context().unbecome()``.
The easiest way to run this example yourself is to download `Typesafe Activator <http://typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Persistence Samples with Java <http://typesafe.com/activator/template/akka-sample-persistence-java8>`_.
It contains instructions on how to run the ``EventsourcedExample``.
Reliable event delivery
-----------------------
Sending events from an event handler to another actor has at-most-once delivery semantics. For at-least-once delivery,
:ref:`channels-java-lambda` must be used. In this case, also replayed events (received by ``receiveRecover``) must be
sent to a channel, as shown in the following example:
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#reliable-event-delivery
In larger integration scenarios, channel destinations may be actors that submit received events to an external
message broker, for example. After having successfully submitted an event, they should call ``confirm()`` on the
received ``ConfirmablePersistent`` message.
Batch writes
============
To optimize throughput, an ``AbstractProcessor`` internally batches received ``Persistent`` messages under high load
before
writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads
to a configurable maximum size (default is ``200``) under high load.
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#max-message-batch-size
A new batch write is triggered by a processor as soon as a batch reaches the maximum size or if the journal completed
writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum.
Applications that want to have more explicit control over batch writes and batch sizes can send processors
``PersistentBatch`` messages.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#batch-write
``Persistent`` messages contained in a ``PersistentBatch`` are always written atomically, even if the batch
size is greater than ``max-message-batch-size``. Also, a ``PersistentBatch`` is written isolated from other batches.
``Persistent`` messages contained in a ``PersistentBatch`` are received individually by a processor.
``PersistentBatch`` messages, for example, are used internally by an ``AbstractEventsourcedProcessor`` to ensure atomic
writes of events. All events that are persisted in context of a single command are written as a single batch to the
journal (even if ``persist`` is called multiple times per command). The recovery of an ``AbstractEventsourcedProcessor``
will therefore never be done partially (with only a subset of events persisted by a single command).
Confirmation and deletion operations performed by :ref:`channels-java-lambda` are also batched. The maximum
confirmation and deletion batch sizes are configurable with ``akka.persistence.journal.max-confirmation-batch-size``
and ``akka.persistence.journal.max-deletion-batch-size``, respectively.
Storage plugins
===============
Storage backends for journals and snapshot stores are pluggable in Akka persistence. The default journal plugin
writes messages to LevelDB (see :ref:`local-leveldb-journal-java-lambda`). The default snapshot store plugin writes
snapshots as individual files to the local filesystem (see :ref:`local-snapshot-store-java-lambda`). Applications can
provide their own plugins by implementing a plugin API and activate them by configuration. Plugin development
requires the following imports:
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistencePluginDocTest.java#plugin-imports
Journal plugin API
------------------
A journal plugin either extends ``SyncWriteJournal`` or ``AsyncWriteJournal``. ``SyncWriteJournal`` is an
actor that should be extended when the storage backend API only supports synchronous, blocking writes. In this
case, the methods to be implemented are:
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java#sync-write-plugin-api
``AsyncWriteJournal`` is an actor that should be extended if the storage backend API supports asynchronous,
non-blocking writes. In this case, the methods to be implemented are:
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java#async-write-plugin-api
Message replays and sequence number recovery are always asynchronous, therefore, any journal plugin must implement:
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java#async-replay-plugin-api
A journal plugin can be activated with the following minimal configuration:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``
for ``SyncWriteJournal`` plugins and ``akka.actor.default-dispatcher`` for ``AsyncWriteJournal`` plugins.
Snapshot store plugin API
-------------------------
A snapshot store plugin must extend the ``SnapshotStore`` actor and implement the following methods:
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java#snapshot-store-plugin-api
A snapshot store plugin can be activated with the following minimal configuration:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
Pre-packaged plugins
====================
.. _local-leveldb-journal-java-lambda:
Local LevelDB journal
---------------------
The default journal plugin is ``akka.persistence.journal.leveldb`` which writes messages to a local LevelDB
instance. The default location of the LevelDB files is a directory named ``journal`` in the current working
directory. This location can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-config
With this plugin, each actor system runs its own private LevelDB instance.
.. _shared-leveldb-journal-java-lambda:
Shared LevelDB journal
----------------------
A LevelDB instance can also be shared by multiple actor systems (on the same or on different nodes). This, for
example, allows processors to failover to a backup node and continue using the shared journal instance from the
backup node.
.. warning::
A shared LevelDB instance is a single point of failure and should therefore only be used for testing
purposes. Highly-available, replicated journal are available as `Community plugins`_.
A shared LevelDB instance is started by instantiating the ``SharedLeveldbStore`` actor.
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#shared-store-creation
By default, the shared instance writes journaled messages to a local directory named ``journal`` in the current
working directory. The storage location can be changed by configuration:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-config
Actor systems that use a shared LevelDB store must activate the ``akka.persistence.journal.leveldb-shared``
plugin.
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-journal-config
This plugin must be initialized by injecting the (remote) ``SharedLeveldbStore`` actor reference. Injection is
done by calling the ``SharedLeveldbJournal.setStore`` method with the actor reference as argument.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistencePluginDocTest.java#shared-store-usage
Internal journal commands (sent by processors) are buffered until injection completes. Injection is idempotent
i.e. only the first injection is used.
.. _local-snapshot-store-java-lambda:
Local snapshot store
--------------------
The default snapshot store plugin is ``akka.persistence.snapshot-store.local``. It writes snapshot files to
the local filesystem. The default storage location is a directory named ``snapshots`` in the current working
directory. This can be changed by configuration where the specified path can be relative or absolute:
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config
Custom serialization
====================
Serialization of snapshots and payloads of ``Persistent`` messages is configurable with Akka's
:ref:`serialization-java` infrastructure. For example, if an application wants to serialize
* payloads of type ``MyPayload`` with a custom ``MyPayloadSerializer`` and
* snapshots of type ``MySnapshot`` with a custom ``MySnapshotSerializer``
it must add
.. includecode:: ../scala/code/docs/persistence/PersistenceSerializerDocSpec.scala#custom-serializer-config
to the application configuration. If not specified, a default serializer is used.
Testing
=======
When running tests with LevelDB default settings in ``sbt``, make sure to set ``fork := true`` in your sbt project
otherwise, you'll see an ``UnsatisfiedLinkError``. Alternatively, you can switch to a LevelDB Java port by setting
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#native-config
or
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-native-config
in your Akka configuration. The LevelDB Java port is for testing purposes only.

View file

@ -4,6 +4,10 @@
Persistence
###########
Java 8 lambda expressions are also supported now. (See section :ref:`persistence-lambda-java`)
Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor
is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept behind Akka
persistence is that only changes to an actor's internal state are persisted but never its current state directly
@ -13,6 +17,10 @@ changes to these actors from which they can rebuild internal state. This can be
or starting from a snapshot which can dramatically reduce recovery times. Akka persistence also provides point-to-point
communication channels with at-least-once message delivery semantics.
.. Lambda warning::
Java 8 lambda expressions are also supported now. (See section :ref:`persistence-lambda-java`)
.. warning::
This module is marked as **“experimental”** as of its introduction in Akka 2.3.0. We will continue to

View file

@ -10,6 +10,7 @@ import scala.collection.immutable
import akka.japi.{ Procedure, Util }
import akka.persistence.JournalProtocol._
import akka.actor.AbstractActor
/**
* INTERNAL API.
@ -282,3 +283,50 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
*/
def onReceiveCommand(msg: Any): Unit
}
/**
* Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]):
* command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors must not be [[Persistent]] or
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the processor.
*/
abstract class AbstractEventsourcedProcessor extends AbstractActor with EventsourcedProcessor {
/**
* Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a processor
* between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new
* commands and unstashing them when the `event` has been persisted and handled. The stash used
* for that is an internal stash which doesn't interfere with the user stash inherited from
* [[UntypedProcessor]].
*
* An event `handler` may close over processor state and modify it. The `getSender()` of a persisted
* event is the sender of the corresponding command. This means that one can reply to a command
* sender within an event `handler`.
*
* Within an event handler, applications usually update processor state using persisted event
* data, notify listeners and reply to command senders.
*
* If persistence of an event fails, the processor will be stopped. This can be customized by
* handling [[PersistenceFailure]] in [[receiveCommand]].
*
* @param event event to be persisted.
* @param handler handler for each persisted `event`
*/
final def persist[A](event: A, handler: Procedure[A]): Unit =
persist(event)(event handler(event))
/**
* Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* `persist[A](event: A, handler: Procedure[A])` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
*
* @param events events to be persisted.
* @param handler handler for each persisted `events`
*/
final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit =
persist(Util.immutableSeq(events))(event handler(event))
}

View file

@ -386,3 +386,62 @@ case class RecoveryException(message: String, cause: Throwable) extends AkkaExce
* @see [[PersistentBatch]]
*/
abstract class UntypedProcessor extends UntypedActor with Processor
/**
* Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]).
* An actor that persists (journals) messages of type [[Persistent]]. Messages of other types
* are not persisted.
*
* {{{
* import akka.persistence.AbstractProcessor;
* import akka.persistence.Persistent;
* import akka.actor.ActorRef;
* import akka.actor.Props;
* import akka.japi.pf.ReceiveBuilder;
* import scala.PartialFunction;
* import scala.runtime.BoxedUnit;
*
* class MyProcessor extends AbstractProcessor {
* public PartialFunction<Object, BoxedUnit> receive() {
* return ReceiveBuilder.
* match(Persistent.class, p -> {
* Object payload = p.payload();
* Long sequenceNr = p.sequenceNr();
* // ...
* }).build();
* }
* }
*
* // ...
*
* ActorRef processor = context().actorOf(Props.create(MyProcessor.class), "myProcessor");
*
* processor.tell(Persistent.create("foo"), null);
* processor.tell("bar", null);
* }}}
*
* During start and restart, persistent messages are replayed to a processor so that it can recover internal
* state from these messages. New messages sent to a processor during recovery do not interfere with replayed
* messages, hence applications don't need to wait for a processor to complete its recovery.
*
* Automated recovery can be turned off or customized by overriding the [[preStart]] and [[preRestart]] life
* cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by
* sending it a [[Recover]] message.
*
* [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence
* starts at `1L` and doesn't contain gaps unless a processor (logically) deletes a message.
*
* During recovery, a processor internally buffers new messages until recovery completes, so that new messages
* do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the
* ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the
* ''user stash'' for stashing/unstashing both persistent and transient messages.
*
* Processors can also store snapshots of internal state by calling [[saveSnapshot]]. During recovery, a saved
* snapshot is offered to the processor with a [[SnapshotOffer]] message, followed by replayed messages, if any,
* that are younger than the snapshot. Default is to offer the latest saved snapshot.
*
* @see [[Processor]]
* @see [[Recover]]
* @see [[PersistentBatch]]
*/
abstract class AbstractProcessor extends AbstractActor with Processor

View file

@ -197,3 +197,10 @@ trait View extends Actor with Recovery {
* @see [[View]]
*/
abstract class UntypedView extends UntypedActor with View
/**
* Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]])
*
* @see [[View]]
*/
abstract class AbstractView extends AbstractActor with View

View file

@ -0,0 +1,59 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<groupId>sample</groupId>
<artifactId>akka-sample-persistence-java8</artifactId>
<packaging>jar</packaging>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.10</artifactId>
<version>2.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence-experimental_2.10</artifactId>
<version>2.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.10</artifactId>
<version>2.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<fork>true</fork>
<compilerArgs>
<arg>-Xlint</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,389 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package doc;
import java.util.concurrent.TimeUnit;
import akka.japi.pf.ReceiveBuilder;
import scala.Option;
import scala.PartialFunction;
import scala.concurrent.duration.Duration;
import akka.actor.*;
import akka.persistence.*;
import scala.runtime.BoxedUnit;
import static java.util.Arrays.asList;
public class LambdaPersistenceDocTest {
public interface ProcessorMethods {
//#processor-id
public String processorId();
//#processor-id
//#recovery-status
public boolean recoveryRunning();
public boolean recoveryFinished();
//#recovery-status
//#current-message
public Persistent getCurrentPersistentMessage();
//#current-message
}
static Object o1 = new Object() {
//#definition
class MyProcessor extends AbstractProcessor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> {
// message successfully written to journal
Object payload = p.payload();
Long sequenceNr = p.sequenceNr();
// ...
}).
match(PersistenceFailure.class, failure -> {
// message failed to be written to journal
Object payload = failure.payload();
Long sequenceNr = failure.sequenceNr();
Throwable cause = failure.cause();
// ...
}).
matchAny(otherwise -> {
// message not written to journal
}).build();
}
}
//#definition
class MyActor extends AbstractActor {
ActorRef processor;
public MyActor() {
//#usage
processor = context().actorOf(Props.create(MyProcessor.class), "myProcessor");
processor.tell(Persistent.create("foo"), null);
processor.tell("bar", null);
//#usage
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, received -> {/* ... */}).build();
}
private void recover() {
//#recover-explicit
processor.tell(Recover.create(), null);
//#recover-explicit
}
}
};
static Object o2 = new Object() {
abstract class MyProcessor1 extends AbstractProcessor {
//#recover-on-start-disabled
@Override
public void preStart() {}
//#recover-on-start-disabled
//#recover-on-restart-disabled
@Override
public void preRestart(Throwable reason, Option<Object> message) {}
//#recover-on-restart-disabled
}
abstract class MyProcessor2 extends AbstractProcessor {
//#recover-on-start-custom
@Override
public void preStart() {
self().tell(Recover.create(457L), null);
}
//#recover-on-start-custom
}
abstract class MyProcessor3 extends AbstractProcessor {
//#deletion
@Override
public void preRestart(Throwable reason, Option<Object> message) {
if (message.isDefined() && message.get() instanceof Persistent) {
deleteMessage(((Persistent) message.get()).sequenceNr());
}
super.preRestart(reason, message);
}
//#deletion
}
class MyProcessor4 extends AbstractProcessor implements ProcessorMethods {
//#processor-id-override
@Override
public String processorId() {
return "my-stable-processor-id";
}
//#processor-id-override
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, received -> {/* ... */}).build();
}
}
};
static Object o3 = new Object() {
//#channel-example
class MyProcessor extends AbstractProcessor {
private final ActorRef destination;
private final ActorRef channel;
public MyProcessor() {
this.destination = context().actorOf(Props.create(MyDestination.class));
this.channel = context().actorOf(Channel.props(), "myChannel");
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> {
Persistent out = p.withPayload("done " + p.payload());
channel.tell(Deliver.create(out, destination.path()), self());
}).build();
}
}
class MyDestination extends AbstractActor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ConfirmablePersistent.class, p -> {
Object payload = p.payload();
Long sequenceNr = p.sequenceNr();
int redeliveries = p.redeliveries();
// ...
p.confirm();
}).build();
}
}
//#channel-example
class MyProcessor2 extends AbstractProcessor {
private final ActorRef destination;
private final ActorRef channel;
public MyProcessor2(ActorRef destination) {
this.destination = context().actorOf(Props.create(MyDestination.class));
//#channel-id-override
this.channel = context().actorOf(Channel.props("my-stable-channel-id"));
//#channel-id-override
//#channel-custom-settings
context().actorOf(
Channel.props(ChannelSettings.create()
.withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))
.withRedeliverMax(15)));
//#channel-custom-settings
//#channel-custom-listener
class MyListener extends AbstractActor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(RedeliverFailure.class, r -> {
Iterable<ConfirmablePersistent> messages = r.getMessages();
// ...
}).build();
}
}
final ActorRef myListener = context().actorOf(Props.create(MyListener.class));
context().actorOf(Channel.props(
ChannelSettings.create().withRedeliverFailureListener(null)));
//#channel-custom-listener
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> {
Persistent out = p.withPayload("done " + p.payload());
channel.tell(Deliver.create(out, destination.path()), self());
//#channel-example-reply
channel.tell(Deliver.create(out, sender().path()), self());
//#channel-example-reply
}).build();
}
}
};
static Object o4 = new Object() {
//#save-snapshot
class MyProcessor extends AbstractProcessor {
private Object state;
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(String.class, s -> s.equals("snap"),
s -> saveSnapshot(state)).
match(SaveSnapshotSuccess.class, ss -> {
SnapshotMetadata metadata = ss.metadata();
// ...
}).
match(SaveSnapshotFailure.class, sf -> {
SnapshotMetadata metadata = sf.metadata();
// ...
}).build();
}
}
//#save-snapshot
};
static Object o5 = new Object() {
//#snapshot-offer
class MyProcessor extends AbstractProcessor {
private Object state;
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(SnapshotOffer.class, s -> {
state = s.snapshot();
// ...
}).
match(Persistent.class, p -> {/* ...*/}).build();
}
}
//#snapshot-offer
class MyActor extends AbstractActor {
ActorRef processor;
public MyActor() {
processor = context().actorOf(Props.create(MyProcessor.class));
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.match(Object.class, o -> {/* ... */}).build();
}
private void recover() {
//#snapshot-criteria
processor.tell(Recover.create(
SnapshotSelectionCriteria
.create(457L, System.currentTimeMillis())), null);
//#snapshot-criteria
}
}
};
static Object o6 = new Object() {
//#batch-write
class MyProcessor extends AbstractProcessor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> p.payload().equals("a"),
p -> {/* ... */}).
match(Persistent.class, p -> p.payload().equals("b"),
p -> {/* ... */}).build();
}
}
class Example {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(MyProcessor.class));
public void batchWrite() {
processor.tell(PersistentBatch
.create(asList(Persistent.create("a"),
Persistent.create("b"))), null);
}
// ...
}
//#batch-write
};
static Object o7 = new Object() {
abstract class MyProcessor extends AbstractProcessor {
ActorRef destination;
public void foo() {
//#persistent-channel-example
final ActorRef channel = context().actorOf(
PersistentChannel.props(
PersistentChannelSettings.create()
.withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))
.withRedeliverMax(15)),
"myPersistentChannel");
channel.tell(Deliver.create(Persistent.create("example"), destination.path()), self());
//#persistent-channel-example
//#persistent-channel-watermarks
PersistentChannelSettings.create()
.withPendingConfirmationsMax(10000)
.withPendingConfirmationsMin(2000);
//#persistent-channel-watermarks
//#persistent-channel-reply
PersistentChannelSettings.create().withReplyPersistent(true);
//#persistent-channel-reply
}
}
};
static Object o8 = new Object() {
//#reliable-event-delivery
class MyEventsourcedProcessor extends AbstractEventsourcedProcessor {
private ActorRef destination;
private ActorRef channel;
public MyEventsourcedProcessor(ActorRef destination) {
this.destination = destination;
this.channel = context().actorOf(Channel.props(), "channel");
}
private void handleEvent(String event) {
// update state
// ...
// reliably deliver events
channel.tell(Deliver.create(
Persistent.create(event, getCurrentPersistentMessage()),
destination.path()), self());
}
@Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(String.class, this::handleEvent).build();
}
@Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(String.class, s -> s.equals("cmd"),
s -> persist("evt", this::handleEvent)).build();
}
}
//#reliable-event-delivery
};
static Object o9 = new Object() {
//#view
class MyView extends AbstractView {
@Override
public String processorId() {
return "some-processor-id";
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, peristent -> {
// ...
}).build();
}
}
//#view
public void usage() {
final ActorSystem system = ActorSystem.create("example");
//#view-update
final ActorRef view = system.actorOf(Props.create(MyView.class));
view.tell(Update.create(true), null);
//#view-update
}
};
}

View file

@ -0,0 +1,113 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package doc;
//#plugin-imports
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;
import scala.concurrent.Future;
import akka.japi.Option;
import akka.japi.Procedure;
import akka.persistence.*;
import akka.persistence.journal.japi.*;
import akka.persistence.snapshot.japi.*;
//#plugin-imports
import akka.actor.*;
import akka.persistence.journal.leveldb.SharedLeveldbJournal;
import akka.persistence.journal.leveldb.SharedLeveldbStore;
import scala.runtime.BoxedUnit;
public class LambdaPersistencePluginDocTest {
static Object o1 = new Object() {
final ActorSystem system = null;
//#shared-store-creation
final ActorRef store = system.actorOf(Props.create(SharedLeveldbStore.class), "store");
//#shared-store-creation
//#shared-store-usage
class SharedStorageUsage extends AbstractActor {
@Override
public void preStart() throws Exception {
String path = "akka.tcp://example@127.0.0.1:2552/user/store";
ActorSelection selection = context().actorSelection(path);
selection.tell(new Identify(1), self());
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ActorIdentity.class, ai -> {
if (ai.correlationId().equals(1)) {
ActorRef store = ai.getRef();
if (store != null) {
SharedLeveldbJournal.setStore(store, context().system());
}
}
}).build();
}
}
//#shared-store-usage
};
class MySnapshotStore extends SnapshotStore {
@Override
public Future<Option<SelectedSnapshot>> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria) {
return null;
}
@Override
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
return null;
}
@Override
public void onSaved(SnapshotMetadata metadata) throws Exception {
}
@Override
public void doDelete(SnapshotMetadata metadata) throws Exception {
}
@Override
public void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception {
}
}
class MyAsyncJournal extends AsyncWriteJournal {
@Override
public Future<Void> doAsyncWriteMessages(Iterable<PersistentRepr> messages) {
return null;
}
@Override
public Future<Void> doAsyncWriteConfirmations(Iterable<PersistentConfirmation> confirmations) {
return null;
}
@Override
public Future<Void> doAsyncDeleteMessages(Iterable<PersistentId> messageIds, boolean permanent) {
return null;
}
@Override
public Future<Void> doAsyncDeleteMessagesTo(String processorId, long toSequenceNr, boolean permanent) {
return null;
}
@Override
public Future<Void> doAsyncReplayMessages(String processorId, long fromSequenceNr,
long toSequenceNr,
long max,
Procedure<PersistentRepr> replayCallback) {
return null;
}
@Override
public Future<Long> doAsyncReadHighestSequenceNr(String processorId, long fromSequenceNr) {
return null;
}
}
}

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.*;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
public class ConversationRecoveryExample {
public static String PING = "PING";
public static String PONG = "PONG";
public static class Ping extends AbstractProcessor {
final ActorRef pongChannel = context().actorOf(Channel.props(), "pongChannel");
int counter = 0;
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> cp.payload().equals(PING), cp -> {
counter += 1;
System.out.println(String.format("received ping %d times", counter));
cp.confirm();
if (!recoveryRunning()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
pongChannel.tell(Deliver.create(cp.withPayload(PONG), sender().path()), self());
}).
match(String.class,
s -> s.equals("init"),
s -> pongChannel.tell(Deliver.create(Persistent.create(PONG), sender().path()), self())).build();
}
}
public static class Pong extends AbstractProcessor {
private final ActorRef pingChannel = context().actorOf(Channel.props(), "pingChannel");
private int counter = 0;
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> cp.payload().equals(PONG), cp -> {
counter += 1;
System.out.println(String.format("received pong %d times", counter));
cp.confirm();
if (!recoveryRunning()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
pingChannel.tell(Deliver.create(cp.withPayload(PING), sender().path()), self());
}).build();
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef ping = system.actorOf(Props.create(Ping.class), "ping");
final ActorRef pong = system.actorOf(Props.create(Pong.class), "pong");
ping.tell("init", pong);
}
}

View file

@ -0,0 +1,132 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
//#eventsourced-example
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractEventsourcedProcessor;
import akka.persistence.SnapshotOffer;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import java.io.Serializable;
import java.util.ArrayList;
import static java.util.Arrays.asList;
class Cmd implements Serializable {
private final String data;
public Cmd(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
class Evt implements Serializable {
private final String data;
public Evt(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
class ExampleState implements Serializable {
private final ArrayList<String> events;
public ExampleState() {
this(new ArrayList<String>());
}
public ExampleState(ArrayList<String> events) {
this.events = events;
}
public ExampleState copy() {
return new ExampleState(new ArrayList<String>(events));
}
public void update(Evt evt) {
events.add(evt.getData());
}
public int size() {
return events.size();
}
@Override
public String toString() {
return events.toString();
}
}
class ExampleProcessor extends AbstractEventsourcedProcessor {
private ExampleState state = new ExampleState();
public int getNumEvents() {
return state.size();
}
@Override public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(Evt.class, state::update).
match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot()).build();
}
@Override public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.match(Cmd.class, c -> {
final String data = c.getData();
final Evt evt1 = new Evt(data + "-" + getNumEvents());
final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1));
persist(asList(evt1, evt2), (Evt evt) -> {
state.update(evt);
if (evt.equals(evt2)) {
context().system().eventStream().publish(evt);
if (data.equals("foo")) { context().become(och, true); }
}
});
}).
match(String.class, s -> s.equals("snap"), s -> saveSnapshot(state.copy())).
match(String.class, s -> s.equals("print"), s -> System.out.println(state)).build();
}
PartialFunction<Object, BoxedUnit> och = ReceiveBuilder.
match(Cmd.class, cmd -> cmd.getData().equals("bar"), cmd -> {
persist(new Evt("bar-" + getNumEvents()), event -> {
state.update(event);
context().unbecome();
});
unstashAll();
}).
matchAny(o -> stash()).build();
}
//#eventsourced-example
public class EventsourcedExample {
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-4-java8");
processor.tell(new Cmd("foo"), null);
processor.tell(new Cmd("baz"), null);
processor.tell(new Cmd("bar"), null);
processor.tell("snap", null);
processor.tell(new Cmd("buzz"), null);
processor.tell("print", null);
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -0,0 +1,58 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.*;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
public class ProcessorChannelExample {
public static class ExampleProcessor extends AbstractProcessor {
private ActorRef destination;
private ActorRef channel;
public ExampleProcessor(ActorRef destination) {
this.destination = destination;
this.channel = context().actorOf(Channel.props(), "channel");
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> {
System.out.println("processed " + p.payload());
channel.tell(Deliver.create(p.withPayload("processed " + p.payload()), destination.path()), self());
}).
match(String.class, s -> System.out.println("reply = " + s)).build();
}
}
public static class ExampleDestination extends AbstractActor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> {
System.out.println("received " + cp.payload());
sender().tell(String.format("re: %s (%d)", cp.payload(), cp.sequenceNr()), null);
cp.confirm();
}).build();
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class));
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor-1");
processor.tell(Persistent.create("a"), null);
processor.tell(Persistent.create("b"), null);
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -0,0 +1,72 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractProcessor;
import akka.persistence.Persistent;
import scala.Option;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import java.util.ArrayList;
public class ProcessorFailureExample {
public static class ExampleProcessor extends AbstractProcessor {
private ArrayList<Object> received = new ArrayList<Object>();
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> p.payload().equals("boom"), p -> {throw new RuntimeException("boom");}).
match(Persistent.class, p -> !p.payload().equals("boom"), p -> received.add(p.payload())).
match(String.class, s -> s.equals("boom"), s -> {throw new RuntimeException("boom");}).
match(String.class, s -> s.equals("print"), s -> System.out.println("received " + received)).build();
}
@Override
public void preRestart(Throwable reason, Option<Object> message) {
if (message.isDefined() && message.get() instanceof Persistent) {
deleteMessage(((Persistent) message.get()).sequenceNr(), false);
}
super.preRestart(reason, message);
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-2");
processor.tell(Persistent.create("a"), null);
processor.tell("print", null);
processor.tell("boom", null);
processor.tell("print", null);
processor.tell(Persistent.create("b"), null);
processor.tell("print", null);
processor.tell(Persistent.create("boom"), null);
processor.tell("print", null);
processor.tell(Persistent.create("c"), null);
processor.tell("print", null);
// Will print in a first run (i.e. with empty journal):
// received [a]
// received [a, b]
// received [a, b, c]
// Will print in a second run:
// received [a, b, c, a]
// received [a, b, c, a, b]
// received [a, b, c, a, b, c]
// etc ...
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -0,0 +1,79 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractProcessor;
import akka.persistence.Persistent;
import akka.persistence.SnapshotOffer;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import java.io.Serializable;
import java.util.ArrayList;
public class SnapshotExample {
public static class ExampleState implements Serializable {
private final ArrayList<String> received;
public ExampleState() {
this(new ArrayList<String>());
}
public ExampleState(ArrayList<String> received) {
this.received = received;
}
public ExampleState copy() {
return new ExampleState(new ArrayList<String>(received));
}
public void update(String s) {
received.add(s);
}
@Override
public String toString() {
return received.toString();
}
}
public static class ExampleProcessor extends AbstractProcessor {
private ExampleState state = new ExampleState();
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> state.update(String.format("%s-%d", p.payload(), p.sequenceNr()))).
match(SnapshotOffer.class, s -> {
ExampleState exState = (ExampleState) s.snapshot();
System.out.println("offered state = " + exState);
state = exState;
}).
match(String.class, s -> s.equals("print"), s -> System.out.println("current state = " + state)).
match(String.class, s -> s.equals("snap"), s ->
// IMPORTANT: create a copy of snapshot
// because ExampleState is mutable !!!
saveSnapshot(state.copy())).build();
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-3-java");
processor.tell(Persistent.create("a"), null);
processor.tell(Persistent.create("b"), null);
processor.tell("snap", null);
processor.tell(Persistent.create("c"), null);
processor.tell(Persistent.create("d"), null);
processor.tell("print", null);
Thread.sleep(1000);
system.shutdown();
}
}

View file

@ -0,0 +1,100 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.*;
import scala.PartialFunction;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;
import java.util.concurrent.TimeUnit;
public class ViewExample {
public static class ExampleProcessor extends AbstractProcessor {
@Override
public String processorId() {
return "processor-5";
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class,
p -> System.out.println(String.format("processor received %s (sequence nr = %d)",
p.payload(),
p.sequenceNr()))).build();
}
}
public static class ExampleView extends AbstractView {
private final ActorRef destination = context().actorOf(Props.create(ExampleDestination.class));
private final ActorRef channel = context().actorOf(Channel.props("channel"));
private int numReplicated = 0;
@Override
public String viewId() {
return "view-5";
}
@Override
public String processorId() {
return "processor-5";
}
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(Persistent.class, p -> {
numReplicated += 1;
System.out.println(String.format("view received %s (sequence nr = %d, num replicated = %d)",
p.payload(),
p.sequenceNr(),
numReplicated));
channel.tell(Deliver.create(p.withPayload("replicated-" + p.payload()), destination.path()),
self());
}).
match(SnapshotOffer.class, so -> {
numReplicated = (Integer) so.snapshot();
System.out.println(String.format("view received snapshot offer %s (metadata = %s)",
numReplicated,
so.metadata()));
}).
match(String.class, s -> s.equals("snap"), s -> saveSnapshot(numReplicated)).build();
}
}
public static class ExampleDestination extends AbstractActor {
@Override public PartialFunction<Object, BoxedUnit> receive() {
return ReceiveBuilder.
match(ConfirmablePersistent.class, cp -> {
System.out.println(String.format("destination received %s (sequence nr = %s)",
cp.payload(),
cp.sequenceNr()));
cp.confirm();
}).build();
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class));
final ActorRef view = system.actorOf(Props.create(ExampleView.class));
system.scheduler()
.schedule(Duration.Zero(),
Duration.create(2, TimeUnit.SECONDS),
processor,
Persistent.create("scheduled"),
system.dispatcher(),
null);
system.scheduler()
.schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null);
}
}

View file

@ -0,0 +1,6 @@
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false