diff --git a/akka-docs/rst/java/index-actors.rst b/akka-docs/rst/java/index-actors.rst index d343351eec..3bb88e0095 100644 --- a/akka-docs/rst/java/index-actors.rst +++ b/akka-docs/rst/java/index-actors.rst @@ -15,3 +15,4 @@ Actors testing lambda-actors lambda-fsm + lambda-persistence diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst new file mode 100644 index 0000000000..5637608196 --- /dev/null +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -0,0 +1,657 @@ +.. _persistence-lambda-java: + +###################################### +Persistence (Java with Lambda Support) +###################################### + + +Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor +is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept behind Akka +persistence is that only changes to an actor's internal state are persisted but never its current state directly +(except for optional snapshots). These changes are only ever appended to storage, nothing is ever mutated, which +allows for very high transaction rates and efficient replication. Stateful actors are recovered by replaying stored +changes to these actors from which they can rebuild internal state. This can be either the full history of changes +or starting from a snapshot which can dramatically reduce recovery times. Akka persistence also provides point-to-point +communication channels with at-least-once message delivery semantics. + +.. warning:: + + This module is marked as **“experimental”** as of its introduction in Akka 2.3.0. We will continue to + improve this API based on our users’ feedback, which implies that while we try to keep incompatible + changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the + contents of the ``akka.persistence`` package. + +Akka persistence is inspired by the `eventsourced`_ library. It follows the same concepts and architecture of +`eventsourced`_ but significantly differs on API and implementation level. + +.. _eventsourced: https://github.com/eligosource/eventsourced + +Dependencies +============ + +Akka persistence is a separate jar file. Make sure that you have the following dependency in your project:: + + + com.typesafe.akka + akka-persistence-experimental_@binVersion@ + @version@ + + +Architecture +============ + +* *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal + before its ``receive`` method is called. When a processor is started or restarted, journaled messages are replayed + to that processor, so that it can recover internal state from these messages. + +* *View*: A view is a persistent, stateful actor that receives journaled messages that have been written by another + processor. A view itself does not journal new messages, instead, it updates internal state only from a processor's + replicated message stream. + +* *Channel*: Channels are used by processors and views to communicate with other actors. They prevent that replayed + messages are redundantly delivered to these actors and provide at-least-once message delivery semantics, also in + case of sender and receiver JVM crashes. + +* *Journal*: A journal stores the sequence of messages sent to a processor. An application can control which messages + are journaled and which are received by the processor without being journaled. The storage backend of a journal is + pluggable. The default journal storage plugin writes to the local filesystem, replicated journals are available as + `Community plugins`_. + +* *Snapshot store*: A snapshot store persists snapshots of a processor's or a view's internal state. Snapshots are + used for optimizing recovery times. The storage backend of a snapshot store is pluggable. The default snapshot + storage plugin writes to the local filesystem. + +* *Event sourcing*. Based on the building blocks described above, Akka persistence provides abstractions for the + development of event sourced applications (see section :ref:`event-sourcing-java-lambda`) + +.. _Community plugins: https://gist.github.com/krasserm/8612920#file-akka-persistence-plugins-md + +.. _processors-lambda-java: + +Processors +========== + +A processor can be implemented by extending ``AbstractProcessor`` class and implementing the +``receive`` method. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#definition + +Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted. +When a processor's ``receive`` method is called with a ``Persistent`` message it can safely assume that this message +has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor +is stopped, by default. If a processor should continue running on persistence failures it must handle +``PersistenceFailure`` messages. In this case, a processor may want to inform the sender about the failure, +so that the sender can re-send the message, if needed. + +An ``AbstractProcessor`` itself is an ``Actor`` and can therefore be instantiated with ``actorOf``. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#usage + +Recovery +-------- + +By default, a processor is automatically recovered on start and on restart by replaying journaled messages. +New messages sent to a processor during recovery do not interfere with replayed messages. New messages will +only be received by a processor after recovery completes. + +Recovery customization +^^^^^^^^^^^^^^^^^^^^^^ + +Automated recovery on start can be disabled by overriding ``preStart`` with an empty implementation. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-disabled + +In this case, a processor must be recovered explicitly by sending it a ``Recover`` message. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-explicit + +If not overridden, ``preStart`` sends a ``Recover`` message to ``self()``. Applications may also override +``preStart`` to define further ``Recover`` parameters such as an upper sequence number bound, for example. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-start-custom + +Upper sequence number bounds can be used to recover a processor to past state instead of current state. Automated +recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recover-on-restart-disabled + +Recovery status +^^^^^^^^^^^^^^^ + +A processor can query its own recovery status via the methods + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#recovery-status + +.. _failure-handling-java-lambda: + +Failure handling +^^^^^^^^^^^^^^^^ + +A persistent message that caused an exception will be received again by a processor after restart. To prevent +a replay of that message during recovery it can be deleted. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#deletion + +Message deletion +---------------- + +A processor can delete a single message by calling the ``deleteMessage`` method with the sequence number of +that message as argument. An optional ``permanent`` parameter specifies whether the message shall be permanently +deleted from the journal or only marked as deleted. In both cases, the message won't be replayed. Later extensions +to Akka persistence will allow to replay messages that have been marked as deleted which can be useful for debugging +purposes, for example. To delete all messages (journaled by a single processor) up to a specified sequence number, +processors should call the ``deleteMessages`` method. + +Identifiers +----------- + +A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the +``String`` representation of processor's path without the address part and can be obtained via the ``processorId`` +method. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id + +Applications can customize a processor's id by specifying an actor name during processor creation as shown in +section :ref:`processors-java`. This changes that processor's name in its actor hierarchy and hence influences only +part of the processor id. To fully customize a processor's id, the ``processorId`` method must be overridden. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#processor-id-override + +Overriding ``processorId`` is the recommended way to generate stable identifiers. + +.. _views-java-lambda: + +Views +===== + +Views can be implemented by extending the ``AbstractView`` abstract class and implementing the ``receive`` and the +``processorId`` +methods. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#view + +The ``processorId`` identifies the processor from which the view receives journaled messages. It is not necessary +the referenced processor is actually running. Views read messages from a processor's journal directly. When a +processor is started later and begins to write new messages, the corresponding view is updated automatically, by +default. + +Updates +------- + +The default update interval of all views of an actor system is configurable: + +.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update-interval + +``View`` implementation classes may also override the ``autoUpdateInterval`` method to return a custom update +interval for a specific view class or view instance. Applications may also trigger additional updates at +any time by sending a view an ``Update`` message. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#view-update + +If the ``await`` parameter is set to ``true``, messages that follow the ``Update`` request are processed when the +incremental message replay, triggered by that update request, completed. If set to ``false`` (default), messages +following the update request may interleave with the replayed message stream. Automated updates always run with +``await = false``. + +Automated updates of all views of an actor system can be turned off by configuration: + +.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#auto-update + +Implementation classes may override the configured default value by overriding the ``autoUpdate`` method. To +limit the number of replayed messages per update request, applications can configure a custom +``akka.persistence.view.auto-update-replay-max`` value or override the ``autoUpdateReplayMax`` method. The number +of replayed messages for manual updates can be limited with the ``replayMax`` parameter of the ``Update`` message. + +Recovery +-------- + +Initial recovery of views works in the very same way as for :ref:`processors` (i.e. by sending a ``Recover`` message +to self). The maximum number of replayed messages during initial recovery is determined by ``autoUpdateReplayMax``. +Further possibilities to customize initial recovery are explained in section :ref:`processors-java`. + +Identifiers +----------- + +A view must have an identifier that doesn't change across different actor incarnations. It defaults to the +``String`` representation of the actor path without the address part and can be obtained via the ``viewId`` +method. + +Applications can customize a view's id by specifying an actor name during view creation. This changes that view's +name in its actor hierarchy and hence influences only part of the view id. To fully customize a view's id, the +``viewId`` method must be overridden. Overriding ``viewId`` is the recommended way to generate stable identifiers. + +The ``viewId`` must differ from the referenced ``processorId``, unless :ref:`snapshots-java` of a view and its +processor shall be shared (which is what applications usually do not want). + +.. _channels-java-lambda: + +Channels +======== + +Channels are special actors that are used by processors or views to communicate with other actors (channel +destinations). The following discusses channels in context of processors but this is also applicable to views. + +Channels prevent redundant delivery of replayed messages to destinations during processor recovery. A replayed +message is retained by a channel if its delivery has been confirmed by a destination. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-example + +A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver`` +request instructs a channel to send a ``Persistent`` message to a destination. A destination is provided as +``ActorPath`` and messages are sent by the channel via that path's ``ActorSelection``. Sender references are +preserved by a channel, therefore, a destination can reply to the sender of a ``Deliver`` request. + +If a processor wants to reply to a ``Persistent`` message sender it should use the ``sender()`` path as +channel destination. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-example-reply + +Persistent messages delivered by a channel are of type ``ConfirmablePersistent``. ``ConfirmablePersistent`` extends +``Persistent`` by adding the methods ``confirm`` and ``redeliveries`` (see also :ref:`redelivery-java-lambda`). A +channel destination confirms the delivery of a ``ConfirmablePersistent`` message by calling ``confirm()`` on that +message. This asynchronously writes a confirmation entry to the journal. Replayed messages internally contain +confirmation entries which allows a channel to decide if it should retain these messages or not. + +A ``Processor`` can also be used as channel destination i.e. it can persist ``ConfirmablePersistent`` messages too. + +.. _redelivery-java-lambda: + +Message re-delivery +------------------- + +Channels re-deliver messages to destinations if they do not confirm delivery within a configurable timeout. +This timeout can be specified as ``redeliverInterval`` when creating a channel, optionally together with the +maximum number of re-deliveries a channel should attempt for each unconfirmed message. The number of re-delivery +attempts can be obtained via the ``redeliveries`` method on ``ConfirmablePersistent``. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-custom-settings + +A channel keeps messages in memory until their successful delivery has been confirmed or the maximum number of +re-deliveries is reached. To be notified about messages that have reached the maximum number of re-deliveries, +applications can register a listener at channel creation. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-custom-listener + +A listener receives ``RedeliverFailure`` notifications containing all messages that could not be delivered. On +receiving a ``RedeliverFailure`` message, an application may decide to restart the sending processor to enforce +a re-send of these messages to the channel or confirm these messages to prevent further re-sends. The sending +processor can also be restarted any time later to re-send unconfirmed messages. + +This combination of + +* message persistence by sending processors +* message replays by sending processors +* message re-deliveries by channels and +* application-level confirmations (acknowledgements) by destinations + +enables channels to provide at-least-once message delivery semantics. Possible duplicates can be detected by +destinations by tracking message sequence numbers. Message sequence numbers are generated per sending processor. +Depending on how a processor routes outbound messages to destinations, they may either see a contiguous message +sequence or a sequence with gaps. + +.. warning:: + + If a processor emits more than one outbound message per inbound ``Persistent`` message it **must** use a + separate channel for each outbound message to ensure that confirmations are uniquely identifiable, otherwise, + at-least-once message delivery semantics do not apply. This rule has been introduced to avoid writing additional + outbound message identifiers to the journal which would decrease the overall throughput. It is furthermore + recommended to collapse multiple outbound messages to the same destination into a single outbound message, + otherwise, if sent via multiple channels, their ordering is not defined. + +If an application wants to have more control how sequence numbers are assigned to messages it should use an +application-specific sequence number generator and include the generated sequence numbers into the ``payload`` +of ``Persistent`` messages. + +Persistent channels +------------------- + +Channels created with ``Channel.props`` do not persist messages. These channels are usually used in combination +with a sending processor that takes care of persistence, hence, channel-specific persistence is not necessary in +this case. They are referred to as transient channels in the following. + +Persistent channels are like transient channels but additionally persist messages before delivering them. Messages +that have been persisted by a persistent channel are deleted when destinations confirm their delivery. A persistent +channel can be created with ``PersistentChannel.props`` and configured with a ``PersistentChannelSettings`` object. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-example + +A persistent channel is useful for delivery of messages to slow destinations or destinations that are unavailable +for a long time. It can constrain the number of pending confirmations based on the ``pendingConfirmationsMax`` +and ``pendingConfirmationsMin`` parameters of ``PersistentChannelSettings``. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-watermarks + +It suspends delivery when the number of pending confirmations reaches ``pendingConfirmationsMax`` and resumes +delivery again when this number falls below ``pendingConfirmationsMin``. This prevents both, flooding destinations +with more messages than they can process and unlimited memory consumption by the channel. A persistent channel +continues to persist new messages even when message delivery is temporarily suspended. + +Standalone usage +---------------- + +Applications may also use channels standalone. Transient channels can be used standalone if re-delivery attempts +to destinations are required but message loss in case of a sender JVM crash is not an issue. If message loss in +case of a sender JVM crash is an issue, persistent channels should be used. In this case, applications may want to +receive replies from the channel whether messages have been successfully persisted or not. This can be enabled by +creating the channel with the ``replyPersistent`` configuration parameter set to ``true``: + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#persistent-channel-reply + +With this setting, either the successfully persisted message is replied to the sender or a ``PersistenceFailure`` +message. In case the latter case, the sender should re-send the message. + +Identifiers +----------- + +In the same way as :ref:`processors-java` and :ref:`views-java`, channels also have an identifier that defaults to a channel's +path. A channel identifier can therefore be customized by using a custom actor name at channel creation. This changes +that channel's name in its actor hierarchy and hence influences only part of the channel identifier. To fully customize +a channel identifier, it should be provided as argument ``Channel.props(String)`` or ``PersistentChannel.props(String)`` +(recommended to generate stable identifiers). + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#channel-id-override + +Persistent messages +=================== + +Payload +------- + +The payload of a ``Persistent`` message can be obtained via its ``payload`` method. Inside processors, new messages +must be derived from the current persistent message before sending them via a channel, either by calling ``p.withPayload(...)`` +or ``Persistent.create(..., getCurrentPersistentMessage())`` where ``getCurrentPersistentMessage()`` is defined on +``AbstractProcessor``. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#current-message + +This is necessary for delivery confirmations to work properly. Both +ways are equivalent but we recommend using ``p.withPayload(...)`` for clarity. It is not allowed to send a message +via a channel that has been created with ``Persistent.create(...)``. This would redeliver the message on every replay +even though its delivery was confirmed by a destination. + +Sequence number +--------------- + +The sequence number of a ``Persistent`` message can be obtained via its ``sequenceNr`` method. Persistent +messages are assigned sequence numbers on a per-processor basis (or per channel basis if used +standalone). A sequence starts at ``1L`` and doesn't contain gaps unless a processor deletes messages. + +.. _snapshots-java-lambda: + +Snapshots +========= + +Snapshots can dramatically reduce recovery times of processors and views. The following discusses snapshots +in context of processors but this is also applicable to views. + +Processors can save snapshots of internal state by calling the ``saveSnapshot`` method. If saving of a snapshot +succeeds, the processor receives a ``SaveSnapshotSuccess`` message, otherwise a ``SaveSnapshotFailure`` message + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#save-snapshot + +During recovery, the processor is offered a previously saved snapshot via a ``SnapshotOffer`` message from +which it can initialize internal state. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-offer + +The replayed messages that follow the ``SnapshotOffer`` message, if any, are younger than the offered snapshot. +They finally recover the processor to its current (i.e. latest) state. + +In general, a processor is only offered a snapshot if that processor has previously saved one or more snapshots +and at least one of these snapshots matches the ``SnapshotSelectionCriteria`` that can be specified for recovery. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#snapshot-criteria + +If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which selects the latest (= youngest) snapshot. +To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no +saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. + +Snapshot deletion +----------------- + +A processor can delete individual snapshots by calling the ``deleteSnapshot`` method with the sequence number and the +timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, processors should +use the ``deleteSnapshots`` method. + +.. _event-sourcing-java-lambda: + +Event sourcing +============== + +In all the examples so far, messages that change a processor's state have been sent as ``Persistent`` messages +by an application, so that they can be replayed during recovery. From this point of view, the journal acts as +a write-ahead-log for whatever ``Persistent`` messages a processor receives. This is also known as *command +sourcing*. Commands, however, may fail and some applications cannot tolerate command failures during recovery. + +For these applications `Event Sourcing`_ is a better choice. Applied to Akka persistence, the basic idea behind +event sourcing is quite simple. A processor receives a (non-persistent) command which is first validated if it +can be applied to the current state. Here, validation can mean anything, from simple inspection of a command +message's fields up to a conversation with several external services, for example. If validation succeeds, events +are generated from the command, representing the effect of the command. These events are then persisted and, after +successful persistence, used to change a processor's state. When the processor needs to be recovered, only the +persisted events are replayed of which we know that they can be successfully applied. In other words, events +cannot fail when being replayed to a processor, in contrast to commands. Eventsourced processors may of course +also process commands that do not change application state, such as query commands, for example. + +.. _Event Sourcing: http://martinfowler.com/eaaDev/EventSourcing.html + +Akka persistence supports event sourcing with the ``AbstractEventsourcedProcessor`` abstract class (which implements +event sourcing as a pattern on top of command sourcing). A processor that extends this abstract class does not handle +``Persistent`` messages directly but uses the ``persist`` method to persist and handle events. The behavior of an +``AbstractEventsourcedProcessor`` is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is +demonstrated in the following example. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/EventsourcedExample.java#eventsourced-example + +The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The +``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. + +The processor's ``receiveRecover`` method defines how ``state`` is updated during recovery by handling ``Evt`` +and ``SnapshotOffer`` messages. The processor's ``receiveCommand`` method is a command handler. In this example, +a command is handled by generating two events which are then persisted and handled. Events are persisted by calling +``persist`` with an event (or a sequence of events) as first argument and an event handler as second argument. + +The ``persist`` method persists events asynchronously and the event handler is executed for successfully persisted +events. Successfully persisted events are internally sent back to the processor as individual messages that trigger +event handler executions. An event handler may close over processor state and mutate it. The sender of a persisted +event is the sender of the corresponding command. This allows event handlers to reply to the sender of a command +(not shown). + +The main responsibility of an event handler is changing processor state using event data and notifying others +about successful state changes by publishing events. + +When persisting events with ``persist`` it is guaranteed that the processor will not receive further commands between +the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist`` +calls in context of a single command. The example also shows how to switch between command different command handlers +with ``context().become()`` and ``context().unbecome()``. + +The easiest way to run this example yourself is to download `Typesafe Activator `_ +and open the tutorial named `Akka Persistence Samples with Java `_. +It contains instructions on how to run the ``EventsourcedExample``. + +Reliable event delivery +----------------------- + +Sending events from an event handler to another actor has at-most-once delivery semantics. For at-least-once delivery, +:ref:`channels-java-lambda` must be used. In this case, also replayed events (received by ``receiveRecover``) must be +sent to a channel, as shown in the following example: + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#reliable-event-delivery + +In larger integration scenarios, channel destinations may be actors that submit received events to an external +message broker, for example. After having successfully submitted an event, they should call ``confirm()`` on the +received ``ConfirmablePersistent`` message. + +Batch writes +============ + +To optimize throughput, an ``AbstractProcessor`` internally batches received ``Persistent`` messages under high load +before +writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads +to a configurable maximum size (default is ``200``) under high load. + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#max-message-batch-size + +A new batch write is triggered by a processor as soon as a batch reaches the maximum size or if the journal completed +writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum. + +Applications that want to have more explicit control over batch writes and batch sizes can send processors +``PersistentBatch`` messages. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java#batch-write + +``Persistent`` messages contained in a ``PersistentBatch`` are always written atomically, even if the batch +size is greater than ``max-message-batch-size``. Also, a ``PersistentBatch`` is written isolated from other batches. +``Persistent`` messages contained in a ``PersistentBatch`` are received individually by a processor. + +``PersistentBatch`` messages, for example, are used internally by an ``AbstractEventsourcedProcessor`` to ensure atomic +writes of events. All events that are persisted in context of a single command are written as a single batch to the +journal (even if ``persist`` is called multiple times per command). The recovery of an ``AbstractEventsourcedProcessor`` +will therefore never be done partially (with only a subset of events persisted by a single command). + +Confirmation and deletion operations performed by :ref:`channels-java-lambda` are also batched. The maximum +confirmation and deletion batch sizes are configurable with ``akka.persistence.journal.max-confirmation-batch-size`` +and ``akka.persistence.journal.max-deletion-batch-size``, respectively. + +Storage plugins +=============== + +Storage backends for journals and snapshot stores are pluggable in Akka persistence. The default journal plugin +writes messages to LevelDB (see :ref:`local-leveldb-journal-java-lambda`). The default snapshot store plugin writes +snapshots as individual files to the local filesystem (see :ref:`local-snapshot-store-java-lambda`). Applications can +provide their own plugins by implementing a plugin API and activate them by configuration. Plugin development +requires the following imports: + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistencePluginDocTest.java#plugin-imports + +Journal plugin API +------------------ + +A journal plugin either extends ``SyncWriteJournal`` or ``AsyncWriteJournal``. ``SyncWriteJournal`` is an +actor that should be extended when the storage backend API only supports synchronous, blocking writes. In this +case, the methods to be implemented are: + +.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java#sync-write-plugin-api + +``AsyncWriteJournal`` is an actor that should be extended if the storage backend API supports asynchronous, +non-blocking writes. In this case, the methods to be implemented are: + +.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java#async-write-plugin-api + +Message replays and sequence number recovery are always asynchronous, therefore, any journal plugin must implement: + +.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncRecoveryPlugin.java#async-replay-plugin-api + +A journal plugin can be activated with the following minimal configuration: + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-plugin-config + +The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher +used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher`` +for ``SyncWriteJournal`` plugins and ``akka.actor.default-dispatcher`` for ``AsyncWriteJournal`` plugins. + +Snapshot store plugin API +------------------------- + +A snapshot store plugin must extend the ``SnapshotStore`` actor and implement the following methods: + +.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java#snapshot-store-plugin-api + +A snapshot store plugin can be activated with the following minimal configuration: + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-store-plugin-config + +The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher +used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``. + +Pre-packaged plugins +==================== + +.. _local-leveldb-journal-java-lambda: + +Local LevelDB journal +--------------------- + +The default journal plugin is ``akka.persistence.journal.leveldb`` which writes messages to a local LevelDB +instance. The default location of the LevelDB files is a directory named ``journal`` in the current working +directory. This location can be changed by configuration where the specified path can be relative or absolute: + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#journal-config + +With this plugin, each actor system runs its own private LevelDB instance. + +.. _shared-leveldb-journal-java-lambda: + +Shared LevelDB journal +---------------------- + +A LevelDB instance can also be shared by multiple actor systems (on the same or on different nodes). This, for +example, allows processors to failover to a backup node and continue using the shared journal instance from the +backup node. + +.. warning:: + + A shared LevelDB instance is a single point of failure and should therefore only be used for testing + purposes. Highly-available, replicated journal are available as `Community plugins`_. + +A shared LevelDB instance is started by instantiating the ``SharedLeveldbStore`` actor. + +.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#shared-store-creation + +By default, the shared instance writes journaled messages to a local directory named ``journal`` in the current +working directory. The storage location can be changed by configuration: + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-config + +Actor systems that use a shared LevelDB store must activate the ``akka.persistence.journal.leveldb-shared`` +plugin. + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-journal-config + +This plugin must be initialized by injecting the (remote) ``SharedLeveldbStore`` actor reference. Injection is +done by calling the ``SharedLeveldbJournal.setStore`` method with the actor reference as argument. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistencePluginDocTest.java#shared-store-usage + +Internal journal commands (sent by processors) are buffered until injection completes. Injection is idempotent +i.e. only the first injection is used. + +.. _local-snapshot-store-java-lambda: + +Local snapshot store +-------------------- + +The default snapshot store plugin is ``akka.persistence.snapshot-store.local``. It writes snapshot files to +the local filesystem. The default storage location is a directory named ``snapshots`` in the current working +directory. This can be changed by configuration where the specified path can be relative or absolute: + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#snapshot-config + +Custom serialization +==================== + +Serialization of snapshots and payloads of ``Persistent`` messages is configurable with Akka's +:ref:`serialization-java` infrastructure. For example, if an application wants to serialize + +* payloads of type ``MyPayload`` with a custom ``MyPayloadSerializer`` and +* snapshots of type ``MySnapshot`` with a custom ``MySnapshotSerializer`` + +it must add + +.. includecode:: ../scala/code/docs/persistence/PersistenceSerializerDocSpec.scala#custom-serializer-config + +to the application configuration. If not specified, a default serializer is used. + +Testing +======= + +When running tests with LevelDB default settings in ``sbt``, make sure to set ``fork := true`` in your sbt project +otherwise, you'll see an ``UnsatisfiedLinkError``. Alternatively, you can switch to a LevelDB Java port by setting + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#native-config + +or + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-native-config + +in your Akka configuration. The LevelDB Java port is for testing purposes only. diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 688211bc8d..182b88430d 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -4,6 +4,10 @@ Persistence ########### + +Java 8 lambda expressions are also supported now. (See section :ref:`persistence-lambda-java`) + + Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept behind Akka persistence is that only changes to an actor's internal state are persisted but never its current state directly @@ -13,6 +17,10 @@ changes to these actors from which they can rebuild internal state. This can be or starting from a snapshot which can dramatically reduce recovery times. Akka persistence also provides point-to-point communication channels with at-least-once message delivery semantics. +.. Lambda warning:: + + Java 8 lambda expressions are also supported now. (See section :ref:`persistence-lambda-java`) + .. warning:: This module is marked as **“experimental”** as of its introduction in Akka 2.3.0. We will continue to diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 5598b302d7..6e55bde0db 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -10,6 +10,7 @@ import scala.collection.immutable import akka.japi.{ Procedure, Util } import akka.persistence.JournalProtocol._ +import akka.actor.AbstractActor /** * INTERNAL API. @@ -282,3 +283,50 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events */ def onReceiveCommand(msg: Any): Unit } + +/** + * Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]): + * command handler. Typically validates commands against current state (and/or by + * communication with other actors). On successful validation, one or more events are + * derived from a command and these events are then persisted by calling `persist`. + * Commands sent to event sourced processors must not be [[Persistent]] or + * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is + * thrown by the processor. + */ +abstract class AbstractEventsourcedProcessor extends AbstractActor with EventsourcedProcessor { + /** + * Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the + * persisted event. It is guaranteed that no new commands will be received by a processor + * between a call to `persist` and the execution of its `handler`. This also holds for + * multiple `persist` calls per received command. Internally, this is achieved by stashing new + * commands and unstashing them when the `event` has been persisted and handled. The stash used + * for that is an internal stash which doesn't interfere with the user stash inherited from + * [[UntypedProcessor]]. + * + * An event `handler` may close over processor state and modify it. The `getSender()` of a persisted + * event is the sender of the corresponding command. This means that one can reply to a command + * sender within an event `handler`. + * + * Within an event handler, applications usually update processor state using persisted event + * data, notify listeners and reply to command senders. + * + * If persistence of an event fails, the processor will be stopped. This can be customized by + * handling [[PersistenceFailure]] in [[receiveCommand]]. + * + * @param event event to be persisted. + * @param handler handler for each persisted `event` + */ + final def persist[A](event: A, handler: Procedure[A]): Unit = + persist(event)(event ⇒ handler(event)) + + /** + * Java API: asynchronously persists `events` in specified order. This is equivalent to calling + * `persist[A](event: A, handler: Procedure[A])` multiple times with the same `handler`, + * except that `events` are persisted atomically with this method. + * + * @param events events to be persisted. + * @param handler handler for each persisted `events` + */ + final def persist[A](events: JIterable[A], handler: Procedure[A]): Unit = + persist(Util.immutableSeq(events))(event ⇒ handler(event)) +} \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 9366663351..86e42d18f5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -386,3 +386,62 @@ case class RecoveryException(message: String, cause: Throwable) extends AkkaExce * @see [[PersistentBatch]] */ abstract class UntypedProcessor extends UntypedActor with Processor + +/** + * Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]). + * An actor that persists (journals) messages of type [[Persistent]]. Messages of other types + * are not persisted. + * + * {{{ + * import akka.persistence.AbstractProcessor; + * import akka.persistence.Persistent; + * import akka.actor.ActorRef; + * import akka.actor.Props; + * import akka.japi.pf.ReceiveBuilder; + * import scala.PartialFunction; + * import scala.runtime.BoxedUnit; + * + * class MyProcessor extends AbstractProcessor { + * public PartialFunction receive() { + * return ReceiveBuilder. + * match(Persistent.class, p -> { + * Object payload = p.payload(); + * Long sequenceNr = p.sequenceNr(); + * // ... + * }).build(); + * } + * } + * + * // ... + * + * ActorRef processor = context().actorOf(Props.create(MyProcessor.class), "myProcessor"); + * + * processor.tell(Persistent.create("foo"), null); + * processor.tell("bar", null); + * }}} + * + * During start and restart, persistent messages are replayed to a processor so that it can recover internal + * state from these messages. New messages sent to a processor during recovery do not interfere with replayed + * messages, hence applications don't need to wait for a processor to complete its recovery. + * + * Automated recovery can be turned off or customized by overriding the [[preStart]] and [[preRestart]] life + * cycle hooks. If automated recovery is turned off, an application can explicitly recover a processor by + * sending it a [[Recover]] message. + * + * [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence + * starts at `1L` and doesn't contain gaps unless a processor (logically) deletes a message. + * + * During recovery, a processor internally buffers new messages until recovery completes, so that new messages + * do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the + * ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the + * ''user stash'' for stashing/unstashing both persistent and transient messages. + * + * Processors can also store snapshots of internal state by calling [[saveSnapshot]]. During recovery, a saved + * snapshot is offered to the processor with a [[SnapshotOffer]] message, followed by replayed messages, if any, + * that are younger than the snapshot. Default is to offer the latest saved snapshot. + * + * @see [[Processor]] + * @see [[Recover]] + * @see [[PersistentBatch]] + */ +abstract class AbstractProcessor extends AbstractActor with Processor diff --git a/akka-persistence/src/main/scala/akka/persistence/View.scala b/akka-persistence/src/main/scala/akka/persistence/View.scala index c2632915ca..63e5e34133 100644 --- a/akka-persistence/src/main/scala/akka/persistence/View.scala +++ b/akka-persistence/src/main/scala/akka/persistence/View.scala @@ -197,3 +197,10 @@ trait View extends Actor with Recovery { * @see [[View]] */ abstract class UntypedView extends UntypedActor with View + +/** + * Java API: compatible with lambda expressions (to be used with [[akka.japi.pf.ReceiveBuilder]]) + * + * @see [[View]] + */ +abstract class AbstractView extends AbstractActor with View diff --git a/akka-samples/akka-sample-persistence-java8/pom.xml b/akka-samples/akka-sample-persistence-java8/pom.xml new file mode 100644 index 0000000000..666f4478b7 --- /dev/null +++ b/akka-samples/akka-sample-persistence-java8/pom.xml @@ -0,0 +1,59 @@ + + 4.0.0 + + + UTF-8 + + + sample + akka-sample-persistence-java8 + jar + 0.0.1-SNAPSHOT + + + + com.typesafe.akka + akka-actor_2.10 + 2.3-SNAPSHOT + + + com.typesafe.akka + akka-persistence-experimental_2.10 + 2.3-SNAPSHOT + + + com.typesafe.akka + akka-testkit_2.10 + 2.3-SNAPSHOT + + + junit + junit + 4.11 + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.8 + 1.8 + true + + -Xlint + + + + + + + + diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java new file mode 100644 index 0000000000..9bed488c8b --- /dev/null +++ b/akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistenceDocTest.java @@ -0,0 +1,389 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package doc; + +import java.util.concurrent.TimeUnit; + +import akka.japi.pf.ReceiveBuilder; +import scala.Option; +import scala.PartialFunction; +import scala.concurrent.duration.Duration; + +import akka.actor.*; +import akka.persistence.*; +import scala.runtime.BoxedUnit; + +import static java.util.Arrays.asList; + +public class LambdaPersistenceDocTest { + + public interface ProcessorMethods { + //#processor-id + public String processorId(); + //#processor-id + //#recovery-status + public boolean recoveryRunning(); + public boolean recoveryFinished(); + //#recovery-status + //#current-message + public Persistent getCurrentPersistentMessage(); + //#current-message + } + + static Object o1 = new Object() { + //#definition + class MyProcessor extends AbstractProcessor { + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, p -> { + // message successfully written to journal + Object payload = p.payload(); + Long sequenceNr = p.sequenceNr(); + // ... + }). + match(PersistenceFailure.class, failure -> { + // message failed to be written to journal + Object payload = failure.payload(); + Long sequenceNr = failure.sequenceNr(); + Throwable cause = failure.cause(); + // ... + }). + matchAny(otherwise -> { + // message not written to journal + }).build(); + } + } + //#definition + + class MyActor extends AbstractActor { + ActorRef processor; + + public MyActor() { + //#usage + processor = context().actorOf(Props.create(MyProcessor.class), "myProcessor"); + + processor.tell(Persistent.create("foo"), null); + processor.tell("bar", null); + //#usage + } + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, received -> {/* ... */}).build(); + } + + private void recover() { + //#recover-explicit + processor.tell(Recover.create(), null); + //#recover-explicit + } + } + }; + + static Object o2 = new Object() { + abstract class MyProcessor1 extends AbstractProcessor { + //#recover-on-start-disabled + @Override + public void preStart() {} + //#recover-on-start-disabled + + //#recover-on-restart-disabled + @Override + public void preRestart(Throwable reason, Option message) {} + //#recover-on-restart-disabled + } + + abstract class MyProcessor2 extends AbstractProcessor { + //#recover-on-start-custom + @Override + public void preStart() { + self().tell(Recover.create(457L), null); + } + //#recover-on-start-custom + } + + abstract class MyProcessor3 extends AbstractProcessor { + //#deletion + @Override + public void preRestart(Throwable reason, Option message) { + if (message.isDefined() && message.get() instanceof Persistent) { + deleteMessage(((Persistent) message.get()).sequenceNr()); + } + super.preRestart(reason, message); + } + //#deletion + } + + class MyProcessor4 extends AbstractProcessor implements ProcessorMethods { + //#processor-id-override + @Override + public String processorId() { + return "my-stable-processor-id"; + } + + //#processor-id-override + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, received -> {/* ... */}).build(); + } + } + }; + + static Object o3 = new Object() { + //#channel-example + class MyProcessor extends AbstractProcessor { + private final ActorRef destination; + private final ActorRef channel; + + public MyProcessor() { + this.destination = context().actorOf(Props.create(MyDestination.class)); + this.channel = context().actorOf(Channel.props(), "myChannel"); + } + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, p -> { + Persistent out = p.withPayload("done " + p.payload()); + channel.tell(Deliver.create(out, destination.path()), self()); + }).build(); + } + } + + class MyDestination extends AbstractActor { + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(ConfirmablePersistent.class, p -> { + Object payload = p.payload(); + Long sequenceNr = p.sequenceNr(); + int redeliveries = p.redeliveries(); + // ... + p.confirm(); + }).build(); + } + } + //#channel-example + + class MyProcessor2 extends AbstractProcessor { + private final ActorRef destination; + private final ActorRef channel; + + public MyProcessor2(ActorRef destination) { + this.destination = context().actorOf(Props.create(MyDestination.class)); + //#channel-id-override + this.channel = context().actorOf(Channel.props("my-stable-channel-id")); + //#channel-id-override + + //#channel-custom-settings + context().actorOf( + Channel.props(ChannelSettings.create() + .withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS)) + .withRedeliverMax(15))); + //#channel-custom-settings + + //#channel-custom-listener + class MyListener extends AbstractActor { + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(RedeliverFailure.class, r -> { + Iterable messages = r.getMessages(); + // ... + }).build(); + } + } + + final ActorRef myListener = context().actorOf(Props.create(MyListener.class)); + context().actorOf(Channel.props( + ChannelSettings.create().withRedeliverFailureListener(null))); + //#channel-custom-listener + + } + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, p -> { + Persistent out = p.withPayload("done " + p.payload()); + channel.tell(Deliver.create(out, destination.path()), self()); + + //#channel-example-reply + channel.tell(Deliver.create(out, sender().path()), self()); + //#channel-example-reply + }).build(); + } + } + }; + + static Object o4 = new Object() { + //#save-snapshot + class MyProcessor extends AbstractProcessor { + private Object state; + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(String.class, s -> s.equals("snap"), + s -> saveSnapshot(state)). + match(SaveSnapshotSuccess.class, ss -> { + SnapshotMetadata metadata = ss.metadata(); + // ... + }). + match(SaveSnapshotFailure.class, sf -> { + SnapshotMetadata metadata = sf.metadata(); + // ... + }).build(); + } + } + //#save-snapshot + }; + + static Object o5 = new Object() { + //#snapshot-offer + class MyProcessor extends AbstractProcessor { + private Object state; + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(SnapshotOffer.class, s -> { + state = s.snapshot(); + // ... + }). + match(Persistent.class, p -> {/* ...*/}).build(); + } + } + //#snapshot-offer + + class MyActor extends AbstractActor { + ActorRef processor; + + public MyActor() { + processor = context().actorOf(Props.create(MyProcessor.class)); + } + + @Override public PartialFunction receive() { + return ReceiveBuilder.match(Object.class, o -> {/* ... */}).build(); + } + + private void recover() { + //#snapshot-criteria + processor.tell(Recover.create( + SnapshotSelectionCriteria + .create(457L, System.currentTimeMillis())), null); + //#snapshot-criteria + } + } + }; + + static Object o6 = new Object() { + //#batch-write + class MyProcessor extends AbstractProcessor { + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, p -> p.payload().equals("a"), + p -> {/* ... */}). + match(Persistent.class, p -> p.payload().equals("b"), + p -> {/* ... */}).build(); + } + } + + class Example { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(MyProcessor.class)); + + public void batchWrite() { + processor.tell(PersistentBatch + .create(asList(Persistent.create("a"), + Persistent.create("b"))), null); + } + + // ... + } + //#batch-write + }; + + static Object o7 = new Object() { + abstract class MyProcessor extends AbstractProcessor { + ActorRef destination; + + public void foo() { + //#persistent-channel-example + final ActorRef channel = context().actorOf( + PersistentChannel.props( + PersistentChannelSettings.create() + .withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS)) + .withRedeliverMax(15)), + "myPersistentChannel"); + + channel.tell(Deliver.create(Persistent.create("example"), destination.path()), self()); + //#persistent-channel-example + //#persistent-channel-watermarks + PersistentChannelSettings.create() + .withPendingConfirmationsMax(10000) + .withPendingConfirmationsMin(2000); + //#persistent-channel-watermarks + //#persistent-channel-reply + PersistentChannelSettings.create().withReplyPersistent(true); + //#persistent-channel-reply + } + } + }; + + static Object o8 = new Object() { + //#reliable-event-delivery + class MyEventsourcedProcessor extends AbstractEventsourcedProcessor { + private ActorRef destination; + private ActorRef channel; + + public MyEventsourcedProcessor(ActorRef destination) { + this.destination = destination; + this.channel = context().actorOf(Channel.props(), "channel"); + } + + private void handleEvent(String event) { + // update state + // ... + // reliably deliver events + channel.tell(Deliver.create( + Persistent.create(event, getCurrentPersistentMessage()), + destination.path()), self()); + } + + @Override public PartialFunction receiveRecover() { + return ReceiveBuilder. + match(String.class, this::handleEvent).build(); + } + + @Override public PartialFunction receiveCommand() { + return ReceiveBuilder. + match(String.class, s -> s.equals("cmd"), + s -> persist("evt", this::handleEvent)).build(); + } + } + //#reliable-event-delivery + }; + + static Object o9 = new Object() { + //#view + class MyView extends AbstractView { + @Override + public String processorId() { + return "some-processor-id"; + } + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, peristent -> { + // ... + }).build(); + } + } + //#view + + public void usage() { + final ActorSystem system = ActorSystem.create("example"); + //#view-update + final ActorRef view = system.actorOf(Props.create(MyView.class)); + view.tell(Update.create(true), null); + //#view-update + } + }; +} diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistencePluginDocTest.java b/akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistencePluginDocTest.java new file mode 100644 index 0000000000..8cb5e4de43 --- /dev/null +++ b/akka-samples/akka-sample-persistence-java8/src/main/java/doc/LambdaPersistencePluginDocTest.java @@ -0,0 +1,113 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package doc; + +//#plugin-imports +import akka.japi.pf.ReceiveBuilder; +import scala.PartialFunction; +import scala.concurrent.Future; +import akka.japi.Option; +import akka.japi.Procedure; +import akka.persistence.*; +import akka.persistence.journal.japi.*; +import akka.persistence.snapshot.japi.*; +//#plugin-imports +import akka.actor.*; +import akka.persistence.journal.leveldb.SharedLeveldbJournal; +import akka.persistence.journal.leveldb.SharedLeveldbStore; +import scala.runtime.BoxedUnit; + +public class LambdaPersistencePluginDocTest { + + + static Object o1 = new Object() { + final ActorSystem system = null; + //#shared-store-creation + final ActorRef store = system.actorOf(Props.create(SharedLeveldbStore.class), "store"); + //#shared-store-creation + + //#shared-store-usage + class SharedStorageUsage extends AbstractActor { + @Override + public void preStart() throws Exception { + String path = "akka.tcp://example@127.0.0.1:2552/user/store"; + ActorSelection selection = context().actorSelection(path); + selection.tell(new Identify(1), self()); + } + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(ActorIdentity.class, ai -> { + if (ai.correlationId().equals(1)) { + ActorRef store = ai.getRef(); + if (store != null) { + SharedLeveldbJournal.setStore(store, context().system()); + } + } + }).build(); + } + } + //#shared-store-usage + }; + + class MySnapshotStore extends SnapshotStore { + @Override + public Future> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria) { + return null; + } + + @Override + public Future doSaveAsync(SnapshotMetadata metadata, Object snapshot) { + return null; + } + + @Override + public void onSaved(SnapshotMetadata metadata) throws Exception { + } + + @Override + public void doDelete(SnapshotMetadata metadata) throws Exception { + } + + @Override + public void doDelete(String processorId, SnapshotSelectionCriteria criteria) throws Exception { + } + } + + class MyAsyncJournal extends AsyncWriteJournal { + @Override + public Future doAsyncWriteMessages(Iterable messages) { + return null; + } + + @Override + public Future doAsyncWriteConfirmations(Iterable confirmations) { + return null; + } + + @Override + public Future doAsyncDeleteMessages(Iterable messageIds, boolean permanent) { + return null; + } + + @Override + public Future doAsyncDeleteMessagesTo(String processorId, long toSequenceNr, boolean permanent) { + return null; + } + + @Override + public Future doAsyncReplayMessages(String processorId, long fromSequenceNr, + long toSequenceNr, + long max, + Procedure replayCallback) { + return null; + } + + @Override + public Future doAsyncReadHighestSequenceNr(String processorId, long fromSequenceNr) { + return null; + } + } +} diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ConversationRecoveryExample.java b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ConversationRecoveryExample.java new file mode 100644 index 0000000000..50e59c641b --- /dev/null +++ b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ConversationRecoveryExample.java @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package sample.persistence; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.japi.pf.ReceiveBuilder; +import akka.persistence.*; +import scala.PartialFunction; +import scala.runtime.BoxedUnit; + +public class ConversationRecoveryExample { + public static String PING = "PING"; + public static String PONG = "PONG"; + + public static class Ping extends AbstractProcessor { + final ActorRef pongChannel = context().actorOf(Channel.props(), "pongChannel"); + int counter = 0; + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(ConfirmablePersistent.class, cp -> cp.payload().equals(PING), cp -> { + counter += 1; + System.out.println(String.format("received ping %d times", counter)); + cp.confirm(); + if (!recoveryRunning()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + pongChannel.tell(Deliver.create(cp.withPayload(PONG), sender().path()), self()); + }). + match(String.class, + s -> s.equals("init"), + s -> pongChannel.tell(Deliver.create(Persistent.create(PONG), sender().path()), self())).build(); + } + } + + public static class Pong extends AbstractProcessor { + private final ActorRef pingChannel = context().actorOf(Channel.props(), "pingChannel"); + private int counter = 0; + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(ConfirmablePersistent.class, cp -> cp.payload().equals(PONG), cp -> { + counter += 1; + System.out.println(String.format("received pong %d times", counter)); + cp.confirm(); + if (!recoveryRunning()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + pingChannel.tell(Deliver.create(cp.withPayload(PING), sender().path()), self()); + }).build(); + } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + + final ActorRef ping = system.actorOf(Props.create(Ping.class), "ping"); + final ActorRef pong = system.actorOf(Props.create(Pong.class), "pong"); + + ping.tell("init", pong); + } +} diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/EventsourcedExample.java b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/EventsourcedExample.java new file mode 100644 index 0000000000..48e17d1446 --- /dev/null +++ b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/EventsourcedExample.java @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package sample.persistence; + +//#eventsourced-example + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.japi.pf.ReceiveBuilder; +import akka.persistence.AbstractEventsourcedProcessor; +import akka.persistence.SnapshotOffer; +import scala.PartialFunction; +import scala.runtime.BoxedUnit; + +import java.io.Serializable; +import java.util.ArrayList; + +import static java.util.Arrays.asList; + +class Cmd implements Serializable { + private final String data; + + public Cmd(String data) { + this.data = data; + } + + public String getData() { + return data; + } +} + +class Evt implements Serializable { + private final String data; + + public Evt(String data) { + this.data = data; + } + + public String getData() { + return data; + } +} + +class ExampleState implements Serializable { + private final ArrayList events; + + public ExampleState() { + this(new ArrayList()); + } + + public ExampleState(ArrayList events) { + this.events = events; + } + + public ExampleState copy() { + return new ExampleState(new ArrayList(events)); + } + + public void update(Evt evt) { + events.add(evt.getData()); + } + + public int size() { + return events.size(); + } + + @Override + public String toString() { + return events.toString(); + } +} + +class ExampleProcessor extends AbstractEventsourcedProcessor { + private ExampleState state = new ExampleState(); + + public int getNumEvents() { + return state.size(); + } + + @Override public PartialFunction receiveRecover() { + return ReceiveBuilder. + match(Evt.class, state::update). + match(SnapshotOffer.class, ss -> state = (ExampleState) ss.snapshot()).build(); + } + + @Override public PartialFunction receiveCommand() { + return ReceiveBuilder.match(Cmd.class, c -> { + final String data = c.getData(); + final Evt evt1 = new Evt(data + "-" + getNumEvents()); + final Evt evt2 = new Evt(data + "-" + (getNumEvents() + 1)); + persist(asList(evt1, evt2), (Evt evt) -> { + state.update(evt); + if (evt.equals(evt2)) { + context().system().eventStream().publish(evt); + if (data.equals("foo")) { context().become(och, true); } + } + }); + }). + match(String.class, s -> s.equals("snap"), s -> saveSnapshot(state.copy())). + match(String.class, s -> s.equals("print"), s -> System.out.println(state)).build(); + } + + PartialFunction och = ReceiveBuilder. + match(Cmd.class, cmd -> cmd.getData().equals("bar"), cmd -> { + persist(new Evt("bar-" + getNumEvents()), event -> { + state.update(event); + context().unbecome(); + }); + unstashAll(); + }). + matchAny(o -> stash()).build(); +} +//#eventsourced-example + +public class EventsourcedExample { + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-4-java8"); + processor.tell(new Cmd("foo"), null); + processor.tell(new Cmd("baz"), null); + processor.tell(new Cmd("bar"), null); + processor.tell("snap", null); + processor.tell(new Cmd("buzz"), null); + processor.tell("print", null); + + Thread.sleep(1000); + system.shutdown(); + } +} diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ProcessorChannelExample.java b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ProcessorChannelExample.java new file mode 100644 index 0000000000..1982413125 --- /dev/null +++ b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ProcessorChannelExample.java @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package sample.persistence; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.japi.pf.ReceiveBuilder; +import akka.persistence.*; +import scala.PartialFunction; +import scala.runtime.BoxedUnit; + +public class ProcessorChannelExample { + public static class ExampleProcessor extends AbstractProcessor { + private ActorRef destination; + private ActorRef channel; + + public ExampleProcessor(ActorRef destination) { + this.destination = destination; + this.channel = context().actorOf(Channel.props(), "channel"); + } + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, p -> { + System.out.println("processed " + p.payload()); + channel.tell(Deliver.create(p.withPayload("processed " + p.payload()), destination.path()), self()); + }). + match(String.class, s -> System.out.println("reply = " + s)).build(); + } + } + + public static class ExampleDestination extends AbstractActor { + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(ConfirmablePersistent.class, cp -> { + System.out.println("received " + cp.payload()); + sender().tell(String.format("re: %s (%d)", cp.payload(), cp.sequenceNr()), null); + cp.confirm(); + }).build(); + } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class)); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor-1"); + + processor.tell(Persistent.create("a"), null); + processor.tell(Persistent.create("b"), null); + + Thread.sleep(1000); + system.shutdown(); + } +} diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ProcessorFailureExample.java b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ProcessorFailureExample.java new file mode 100644 index 0000000000..9b40a6f479 --- /dev/null +++ b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ProcessorFailureExample.java @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package sample.persistence; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.japi.pf.ReceiveBuilder; +import akka.persistence.AbstractProcessor; +import akka.persistence.Persistent; +import scala.Option; +import scala.PartialFunction; +import scala.runtime.BoxedUnit; + +import java.util.ArrayList; + +public class ProcessorFailureExample { + public static class ExampleProcessor extends AbstractProcessor { + private ArrayList received = new ArrayList(); + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, p -> p.payload().equals("boom"), p -> {throw new RuntimeException("boom");}). + match(Persistent.class, p -> !p.payload().equals("boom"), p -> received.add(p.payload())). + match(String.class, s -> s.equals("boom"), s -> {throw new RuntimeException("boom");}). + match(String.class, s -> s.equals("print"), s -> System.out.println("received " + received)).build(); + } + + @Override + public void preRestart(Throwable reason, Option message) { + if (message.isDefined() && message.get() instanceof Persistent) { + deleteMessage(((Persistent) message.get()).sequenceNr(), false); + } + super.preRestart(reason, message); + } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-2"); + + processor.tell(Persistent.create("a"), null); + processor.tell("print", null); + processor.tell("boom", null); + processor.tell("print", null); + processor.tell(Persistent.create("b"), null); + processor.tell("print", null); + processor.tell(Persistent.create("boom"), null); + processor.tell("print", null); + processor.tell(Persistent.create("c"), null); + processor.tell("print", null); + + // Will print in a first run (i.e. with empty journal): + + // received [a] + // received [a, b] + // received [a, b, c] + + // Will print in a second run: + + // received [a, b, c, a] + // received [a, b, c, a, b] + // received [a, b, c, a, b, c] + + // etc ... + + Thread.sleep(1000); + system.shutdown(); + } +} diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/SnapshotExample.java b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/SnapshotExample.java new file mode 100644 index 0000000000..e62cc0bdd4 --- /dev/null +++ b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/SnapshotExample.java @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package sample.persistence; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.japi.pf.ReceiveBuilder; +import akka.persistence.AbstractProcessor; +import akka.persistence.Persistent; +import akka.persistence.SnapshotOffer; +import scala.PartialFunction; +import scala.runtime.BoxedUnit; + +import java.io.Serializable; +import java.util.ArrayList; + +public class SnapshotExample { + public static class ExampleState implements Serializable { + private final ArrayList received; + + public ExampleState() { + this(new ArrayList()); + } + + public ExampleState(ArrayList received) { + this.received = received; + } + + public ExampleState copy() { + return new ExampleState(new ArrayList(received)); + } + + public void update(String s) { + received.add(s); + } + + @Override + public String toString() { + return received.toString(); + } + } + + public static class ExampleProcessor extends AbstractProcessor { + private ExampleState state = new ExampleState(); + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, p -> state.update(String.format("%s-%d", p.payload(), p.sequenceNr()))). + match(SnapshotOffer.class, s -> { + ExampleState exState = (ExampleState) s.snapshot(); + System.out.println("offered state = " + exState); + state = exState; + }). + match(String.class, s -> s.equals("print"), s -> System.out.println("current state = " + state)). + match(String.class, s -> s.equals("snap"), s -> + // IMPORTANT: create a copy of snapshot + // because ExampleState is mutable !!! + saveSnapshot(state.copy())).build(); + } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor-3-java"); + + processor.tell(Persistent.create("a"), null); + processor.tell(Persistent.create("b"), null); + processor.tell("snap", null); + processor.tell(Persistent.create("c"), null); + processor.tell(Persistent.create("d"), null); + processor.tell("print", null); + + Thread.sleep(1000); + system.shutdown(); + } +} diff --git a/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ViewExample.java b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ViewExample.java new file mode 100644 index 0000000000..eb28d94063 --- /dev/null +++ b/akka-samples/akka-sample-persistence-java8/src/main/java/sample/persistence/ViewExample.java @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package sample.persistence; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.japi.pf.ReceiveBuilder; +import akka.persistence.*; +import scala.PartialFunction; +import scala.concurrent.duration.Duration; +import scala.runtime.BoxedUnit; + +import java.util.concurrent.TimeUnit; + +public class ViewExample { + public static class ExampleProcessor extends AbstractProcessor { + @Override + public String processorId() { + return "processor-5"; + } + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, + p -> System.out.println(String.format("processor received %s (sequence nr = %d)", + p.payload(), + p.sequenceNr()))).build(); + } + } + + public static class ExampleView extends AbstractView { + private final ActorRef destination = context().actorOf(Props.create(ExampleDestination.class)); + private final ActorRef channel = context().actorOf(Channel.props("channel")); + + private int numReplicated = 0; + + @Override + public String viewId() { + return "view-5"; + } + + @Override + public String processorId() { + return "processor-5"; + } + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(Persistent.class, p -> { + numReplicated += 1; + System.out.println(String.format("view received %s (sequence nr = %d, num replicated = %d)", + p.payload(), + p.sequenceNr(), + numReplicated)); + channel.tell(Deliver.create(p.withPayload("replicated-" + p.payload()), destination.path()), + self()); + }). + match(SnapshotOffer.class, so -> { + numReplicated = (Integer) so.snapshot(); + System.out.println(String.format("view received snapshot offer %s (metadata = %s)", + numReplicated, + so.metadata())); + }). + match(String.class, s -> s.equals("snap"), s -> saveSnapshot(numReplicated)).build(); + } + } + + public static class ExampleDestination extends AbstractActor { + + @Override public PartialFunction receive() { + return ReceiveBuilder. + match(ConfirmablePersistent.class, cp -> { + System.out.println(String.format("destination received %s (sequence nr = %s)", + cp.payload(), + cp.sequenceNr())); + cp.confirm(); + }).build(); + } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class)); + final ActorRef view = system.actorOf(Props.create(ExampleView.class)); + + system.scheduler() + .schedule(Duration.Zero(), + Duration.create(2, TimeUnit.SECONDS), + processor, + Persistent.create("scheduled"), + system.dispatcher(), + null); + system.scheduler() + .schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null); + } +} diff --git a/akka-samples/akka-sample-persistence-java8/src/main/resources/application.conf b/akka-samples/akka-sample-persistence-java8/src/main/resources/application.conf new file mode 100644 index 0000000000..da5be9b92c --- /dev/null +++ b/akka-samples/akka-sample-persistence-java8/src/main/resources/application.conf @@ -0,0 +1,6 @@ +akka.persistence.journal.leveldb.dir = "target/example/journal" +akka.persistence.snapshot-store.local.dir = "target/example/snapshots" + +# DO NOT USE THIS IN PRODUCTION !!! +# See also https://github.com/typesafehub/activator/issues/287 +akka.persistence.journal.leveldb.native = false