akka-persistence prototype
The most prominent changes compared to eventsourced are: - No central processor and channel registry any more - Auto-recovery of processors on start and restart (can be disabled) - Recovery of processor networks doesn't require coordination - Explicit channel activation not needed any more - Message sequence numbers generated per processor (no gaps) - Sender references are journaled along with messages - Processors can determine their recovery status - No custom API on extension object, only messages - Journal created by extension from config, not by application - Applications only interact with processors and channels via messages - Internal design prepared for having processor-specific journal actors (for later optimization possibilities) Further additions and changes during review: - Allow processor implementation classes to use inherited stash - Channel support to resolve (potentially invalid) sender references - Logical intead of physical deletion of messages - Pinned dispatcher for LevelDB journal - Processor can handle failures during recovery - Message renamed to Persistent This prototype has the following limitations: - Serialization of persistent messages and their payload via JavaSerializer only (will be configurable later) - The LevelDB journal implementation based on a LevelDB Java port, not the native LevelDB (will be configurable later) The following features will be added later using separate tickets: - Snapshot-based recovery - Reliable channels - Journal plugin API - Optimizations - ...
This commit is contained in:
parent
1187fecfcc
commit
cdeea924ff
28 changed files with 3119 additions and 8 deletions
|
|
@ -469,6 +469,16 @@ trait Actor {
|
|||
def receive: Actor.Receive
|
||||
//#receive
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Can be overridden to intercept calls to this actor's current behavior.
|
||||
*
|
||||
* @param receive current behavior.
|
||||
* @param msg current message.
|
||||
*/
|
||||
protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled)
|
||||
|
||||
/**
|
||||
* User overridable definition the strategy to use for supervising
|
||||
* child actors.
|
||||
|
|
|
|||
|
|
@ -495,7 +495,7 @@ private[akka] class ActorCell(
|
|||
}
|
||||
}
|
||||
|
||||
final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled)
|
||||
final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)
|
||||
|
||||
/*
|
||||
* ACTOR CONTEXT IMPLEMENTATION
|
||||
|
|
|
|||
|
|
@ -77,10 +77,13 @@ trait UnrestrictedStash extends Actor {
|
|||
config.getInt("stash-capacity")
|
||||
}
|
||||
|
||||
/* The actor's deque-based message queue.
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* The actor's deque-based message queue.
|
||||
* `mailbox.queue` is the underlying `Deque`.
|
||||
*/
|
||||
private val mailbox: DequeBasedMessageQueueSemantics = {
|
||||
protected[akka] val mailbox: DequeBasedMessageQueueSemantics = {
|
||||
context.asInstanceOf[ActorCell].mailbox.messageQueue match {
|
||||
case queue: DequeBasedMessageQueueSemantics ⇒ queue
|
||||
case other ⇒ throw ActorInitializationException(self, s"DequeBasedMailbox required, got: ${other.getClass.getName}\n" +
|
||||
|
|
@ -116,9 +119,26 @@ trait UnrestrictedStash extends Actor {
|
|||
*
|
||||
* The stash is guaranteed to be empty after calling `unstashAll()`.
|
||||
*/
|
||||
def unstashAll(): Unit = {
|
||||
def unstashAll(): Unit = unstashAll(_ ⇒ true)
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Prepends selected messages in the stash, applying `filterPredicate`, to the
|
||||
* mailbox, and then clears the stash.
|
||||
*
|
||||
* Messages from the stash are enqueued to the mailbox until the capacity of the
|
||||
* mailbox (if any) has been reached. In case a bounded mailbox overflows, a
|
||||
* `MessageQueueAppendFailedException` is thrown.
|
||||
*
|
||||
* The stash is guaranteed to be empty after calling `unstashAll(Any => Boolean)`.
|
||||
*
|
||||
* @param filterPredicate only stashed messages selected by this predicate are
|
||||
* prepended to the mailbox.
|
||||
*/
|
||||
protected[akka] def unstashAll(filterPredicate: Any ⇒ Boolean): Unit = {
|
||||
try {
|
||||
val i = theStash.reverseIterator
|
||||
val i = theStash.reverseIterator.filter(envelope ⇒ filterPredicate(envelope.message))
|
||||
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
|
||||
} finally {
|
||||
theStash = Vector.empty[Envelope]
|
||||
|
|
|
|||
172
akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java
Normal file
172
akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
package docs.persistence;
|
||||
|
||||
import scala.Option;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.persistence.*;
|
||||
|
||||
public class PersistenceDocTest {
|
||||
|
||||
public interface ProcessorMethods {
|
||||
//#processor-id
|
||||
public String processorId();
|
||||
//#processor-id
|
||||
//#recovery-status
|
||||
public boolean recoveryRunning();
|
||||
public boolean recoveryFinished();
|
||||
//#recovery-status
|
||||
//#current-message
|
||||
public Persistent getCurrentPersistentMessage();
|
||||
//#current-message
|
||||
}
|
||||
|
||||
static Object o1 = new Object() {
|
||||
//#definition
|
||||
class MyProcessor extends UntypedProcessor {
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof Persistent) {
|
||||
// message has been written to journal
|
||||
Persistent persistent = (Persistent)message;
|
||||
Object payload = persistent.payload();
|
||||
Long sequenceNr = persistent.sequenceNr();
|
||||
// ...
|
||||
} else {
|
||||
// message has not been written to journal
|
||||
}
|
||||
}
|
||||
}
|
||||
//#definition
|
||||
|
||||
class MyActor extends UntypedActor {
|
||||
ActorRef processor;
|
||||
|
||||
public MyActor() {
|
||||
//#usage
|
||||
processor = getContext().actorOf(Props.create(MyProcessor.class), "myProcessor");
|
||||
|
||||
processor.tell(Persistent.create("foo"), null);
|
||||
processor.tell("bar", null);
|
||||
//#usage
|
||||
}
|
||||
|
||||
public void onReceive(Object message) throws Exception {
|
||||
// ...
|
||||
}
|
||||
|
||||
private void recover() {
|
||||
//#recover-explicit
|
||||
processor.tell(Recover.create(), null);
|
||||
//#recover-explicit
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
static Object o2 = new Object() {
|
||||
abstract class MyProcessor1 extends UntypedProcessor {
|
||||
//#recover-on-start-disabled
|
||||
@Override
|
||||
public void preStartProcessor() {}
|
||||
//#recover-on-start-disabled
|
||||
|
||||
//#recover-on-restart-disabled
|
||||
@Override
|
||||
public void preRestartProcessor(Throwable reason, Option<Object> message) {}
|
||||
//#recover-on-restart-disabled
|
||||
}
|
||||
|
||||
abstract class MyProcessor2 extends UntypedProcessor {
|
||||
//#recover-on-start-custom
|
||||
@Override
|
||||
public void preStartProcessor() {
|
||||
getSelf().tell(Recover.create(457L), null);
|
||||
}
|
||||
//#recover-on-start-custom
|
||||
}
|
||||
|
||||
abstract class MyProcessor3 extends UntypedProcessor {
|
||||
//#deletion
|
||||
@Override
|
||||
public void preRestartProcessor(Throwable reason, Option<Object> message) throws Exception {
|
||||
if (message.isDefined() && message.get() instanceof Persistent) {
|
||||
delete((Persistent) message.get());
|
||||
}
|
||||
super.preRestartProcessor(reason, message);
|
||||
}
|
||||
//#deletion
|
||||
}
|
||||
|
||||
class MyProcessor4 extends UntypedProcessor implements ProcessorMethods {
|
||||
//#processor-id-override
|
||||
@Override
|
||||
public String processorId() {
|
||||
return "my-stable-processor-id";
|
||||
}
|
||||
//#processor-id-override
|
||||
@Override
|
||||
public void onReceive(Object message) throws Exception {}
|
||||
}
|
||||
};
|
||||
|
||||
static Object o3 = new Object() {
|
||||
//#channel-example
|
||||
class MyProcessor extends UntypedProcessor {
|
||||
private final ActorRef destination;
|
||||
private final ActorRef channel;
|
||||
|
||||
public MyProcessor() {
|
||||
this.destination = getContext().actorOf(Props.create(MyDestination.class));
|
||||
this.channel = getContext().actorOf(Channel.props(), "myChannel");
|
||||
}
|
||||
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof Persistent) {
|
||||
Persistent p = (Persistent)message;
|
||||
Persistent out = p.withPayload("done " + p.payload());
|
||||
channel.tell(Deliver.create(out, destination), getSelf());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MyDestination extends UntypedActor {
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof Persistent) {
|
||||
Persistent p = (Persistent)message;
|
||||
System.out.println("received " + p.payload());
|
||||
p.confirm();
|
||||
}
|
||||
}
|
||||
}
|
||||
//#channel-example
|
||||
|
||||
class MyProcessor2 extends UntypedProcessor {
|
||||
private final ActorRef destination;
|
||||
private final ActorRef channel;
|
||||
|
||||
public MyProcessor2(ActorRef destination) {
|
||||
this.destination = getContext().actorOf(Props.create(MyDestination.class));
|
||||
//#channel-id-override
|
||||
this.channel = getContext().actorOf(Channel.props("my-stable-channel-id"));
|
||||
//#channel-id-override
|
||||
}
|
||||
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof Persistent) {
|
||||
Persistent p = (Persistent)message;
|
||||
Persistent out = p.withPayload("done " + p.payload());
|
||||
channel.tell(Deliver.create(out, destination), getSelf());
|
||||
|
||||
//#channel-example-reply
|
||||
channel.tell(Deliver.create(out, getSender()), getSelf());
|
||||
//#channel-example-reply
|
||||
//#resolve-destination
|
||||
channel.tell(Deliver.create(out, getSender(), Resolve.destination()), getSelf());
|
||||
//#resolve-destination
|
||||
//#resolve-sender
|
||||
channel.tell(Deliver.create(out, destination, Resolve.sender()), getSender());
|
||||
//#resolve-sender
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
@ -11,4 +11,5 @@ Actors
|
|||
mailboxes
|
||||
routing
|
||||
fsm
|
||||
persistence
|
||||
testing
|
||||
|
|
|
|||
234
akka-docs/rst/java/persistence.rst
Normal file
234
akka-docs/rst/java/persistence.rst
Normal file
|
|
@ -0,0 +1,234 @@
|
|||
.. _persistence-java:
|
||||
|
||||
###########
|
||||
Persistence
|
||||
###########
|
||||
|
||||
This section describes an early access version of the Akka persistence module. Akka persistence is heavily inspired
|
||||
by the `eventsourced`_ library. It follows the same concepts and architecture of `eventsourced`_ but significantly
|
||||
differs on API and implementation level.
|
||||
|
||||
.. warning::
|
||||
|
||||
This module is marked as **“experimental”** as of its introduction in Akka 2.3.0. We will continue to
|
||||
improve this API based on our users’ feedback, which implies that while we try to keep incompatible
|
||||
changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the
|
||||
contents of the ``akka.persistence`` package.
|
||||
|
||||
.. _eventsourced: https://github.com/eligosource/eventsourced
|
||||
|
||||
Dependencies
|
||||
============
|
||||
|
||||
Akka persistence is a separate jar file. Make sure that you have the following dependency in your project::
|
||||
|
||||
<dependency>
|
||||
<groupId>com.typesafe.akka</groupId>
|
||||
<artifactId>akka-persistence_@binVersion@</artifactId>
|
||||
<version>@version@</version>
|
||||
</dependency>
|
||||
|
||||
Architecture
|
||||
============
|
||||
|
||||
* *Processor*: A processor is a persistent actor. Messages sent to a processor are written to a journal before
|
||||
its ``onReceive`` method is called. When a processor is started or restarted, journaled messages are replayed
|
||||
to that processor, so that it can recover internal state from these messages.
|
||||
|
||||
* *Channel*: Channels are used by processors to communicate with other actors. They prevent that replayed messages
|
||||
are redundantly delivered to these actors.
|
||||
|
||||
Use cases
|
||||
=========
|
||||
|
||||
* TODO: describe command sourcing
|
||||
* TODO: describe event sourcing
|
||||
|
||||
Configuration
|
||||
=============
|
||||
|
||||
By default, journaled messages are written to a directory named ``journal`` in the current working directory. This
|
||||
can be changed by configuration where the specified path can be relative or absolute:
|
||||
|
||||
.. includecode:: ../scala/code/docs/persistence/PersistenceDocSpec.scala#config
|
||||
|
||||
.. _processors-java:
|
||||
|
||||
Processors
|
||||
==========
|
||||
|
||||
A processor can be implemented by extending the abstract ``UntypedProcessor`` class and implementing the
|
||||
``onReceive`` method.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#definition
|
||||
|
||||
Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted.
|
||||
When a processor's ``onReceive`` method is called with a ``Persistent`` message it can safely assume that this message
|
||||
has been successfully written to the journal. A ``UntypedProcessor`` itself is an ``Actor`` and can therefore
|
||||
be instantiated with ``actorOf``.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#usage
|
||||
|
||||
Recovery
|
||||
--------
|
||||
|
||||
By default, a processor is automatically recovered on start and on restart by replaying persistent messages.
|
||||
New messages sent to a processor during recovery do not interfere with replayed messages. New messages will
|
||||
only be received by that processor after recovery completes.
|
||||
|
||||
Recovery customization
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Automated recovery on start can be disabled by overriding ``preStartProcessor`` with an empty implementation.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-disabled
|
||||
|
||||
In this case, a processor must be recovered explicitly by sending it a ``Recover`` message.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-explicit
|
||||
|
||||
If not overridden, ``preStartProcessor`` sends a ``Recover`` message to ``getSelf()``. Applications may also override
|
||||
``preStartProcessor`` to define further ``Recover`` parameters such as an upper sequence number bound, for example.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-custom
|
||||
|
||||
Automated recovery on restart can be disabled by overriding ``preRestartProcessor`` with an empty implementation.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-restart-disabled
|
||||
|
||||
This is useful in situations where processors are *resumed* by a supervisor (which keeps accumulated internal
|
||||
state and makes a message replay unnecessary).
|
||||
|
||||
Recovery status
|
||||
^^^^^^^^^^^^^^^
|
||||
|
||||
A processor can query its own recovery status via the methods
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recovery-status
|
||||
|
||||
.. _failure-handling-java:
|
||||
|
||||
Failure handling
|
||||
^^^^^^^^^^^^^^^^
|
||||
|
||||
A persistent message that caused an exception will be received again by a processor after restart. To prevent
|
||||
a replay of that message during recovery it can be marked as deleted.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#deletion
|
||||
|
||||
Life cycle hooks
|
||||
----------------
|
||||
|
||||
``UntypedProcessor`` implementation classes should override the ``preStartProcessor``, ``preRestartProcessor``,
|
||||
``postRestartProcessor`` and ``postStopProcessor`` life cycle hooks and not ``preStart``, ``preRestart``,
|
||||
``postRestart`` and ``postStop`` directly.
|
||||
|
||||
Identifiers
|
||||
-----------
|
||||
|
||||
A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the
|
||||
``String`` representation of processor's path and can be obtained via the ``processorId`` method.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#processor-id
|
||||
|
||||
Applications can customize a processor's id by specifying an actor name during processor creation as shown in
|
||||
section :ref:`processors-java`. This works well when using local actor references but may cause problems with remote
|
||||
actor references because their paths also contain deployment information such as host and port (and actor deployments
|
||||
are likely to change during the lifetime of an application). In this case, ``UntypedProcessor`` implementation classes
|
||||
should override ``processorId``.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#processor-id-override
|
||||
|
||||
Later versions of the Akka persistence module will likely offer a possibility to migrate processor ids.
|
||||
|
||||
Channels
|
||||
========
|
||||
|
||||
Channels are special actors that are used by processors to communicate with other actors (channel destinations).
|
||||
Channels prevent redundant delivery of replayed messages to destinations during processor recovery. A replayed
|
||||
message is retained by a channel if its previous delivery has been confirmed by a destination.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-example
|
||||
|
||||
A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver``
|
||||
request instructs a channel to send a ``Persistent`` message to a destination where the sender of the ``Deliver``
|
||||
request is forwarded to the destination. A processor may also reply to a message sender directly by using
|
||||
``getSender()`` as channel destination.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-example-reply
|
||||
|
||||
Channel destinations confirm the delivery of a ``Persistent`` message by calling its ``confirm()`` method. This
|
||||
(asynchronously) writes a confirmation entry to the journal. Replayed messages internally contain these confirmation
|
||||
entries which allows a channel to decide if a message should be retained or not.
|
||||
|
||||
If an application crashes after a destination called ``confirm()`` but before the confirmation entry could have
|
||||
been written to the journal then the unconfirmed message will be delivered again during next recovery and it is
|
||||
the destination's responsibility to detect the duplicate or simply process the message again if it's an idempotent
|
||||
receiver. Duplicates can be detected, for example, by tracking sequence numbers.
|
||||
|
||||
Currently, channels do not store ``Deliver`` requests or retry delivery on network or destination failures. This
|
||||
feature (*reliable channels*) will be available soon.
|
||||
|
||||
Sender resolution
|
||||
-----------------
|
||||
|
||||
``ActorRef`` s of ``Persistent`` message senders are also stored in the journal. Consequently, they may become invalid if
|
||||
an application is restarted and messages are replayed. For example, the stored ``ActorRef`` may then reference
|
||||
a previous incarnation of a sender and a new incarnation of that sender cannot receive a reply from a processor.
|
||||
This may be acceptable for many applications but others may require that a new sender incarnation receives the
|
||||
reply (to reliably resume a conversation between actors after a JVM crash, for example). Here, a channel may
|
||||
assist in resolving new sender incarnations by specifying a third ``Deliver`` argument:
|
||||
|
||||
* ``Resolve.destination()`` if the sender of a persistent message is used as channel destination
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#resolve-destination
|
||||
|
||||
* ``Resolve.sender()`` if the sender of a persistent message is forwarded to a destination.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#resolve-sender
|
||||
|
||||
Default is ``Resolve.off()`` which means no resolution. Find out more in the ``Deliver`` API docs.
|
||||
|
||||
Identifiers
|
||||
-----------
|
||||
|
||||
In the same way as :ref:`processors-java`, channels also have an identifier that defaults to a channel's path. A channel
|
||||
identifier can therefore be customized by using a custom actor name at channel creation. As already mentioned, this
|
||||
works well when using local actor references but may cause problems with remote actor references. In this case, an
|
||||
application-defined channel id should be provided as argument to ``Channel.props(String)``
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-id-override
|
||||
|
||||
Persistent messages
|
||||
===================
|
||||
|
||||
Payload
|
||||
-------
|
||||
|
||||
The payload of a ``Persistent`` message can be obtained via its ``payload`` method. Inside processors, new messages
|
||||
must be derived from the current persistent message before sending them via a channel, either by calling ``p.withPayload(...)``
|
||||
or ``Persistent.create(..., getCurrentPersistentMessage())`` where ``getCurrentPersistentMessage()`` is defined on
|
||||
``UntypedProcessor``.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#current-message
|
||||
|
||||
This is necessary for delivery confirmations to work properly. Both
|
||||
ways are equivalent but we recommend using ``p.withPayload(...)`` for clarity. It is not allowed to send a message
|
||||
via a channel that has been created with ``Persistent.create(...)``. This would redeliver the message on every replay
|
||||
even though its delivery was confirmed by a destination.
|
||||
|
||||
Sequence number
|
||||
---------------
|
||||
|
||||
The sequence number of a ``Persistent`` message can be obtained via its ``sequenceNr`` method. Persistent
|
||||
messages are assigned sequence numbers on a per-processor basis. A sequence starts at ``1L`` and doesn't contain
|
||||
gaps unless a processor marks a message as deleted.
|
||||
|
||||
Upcoming features
|
||||
=================
|
||||
|
||||
* Snapshot based recovery
|
||||
* Configurable serialization
|
||||
* Reliable channels
|
||||
* Journal plugin API
|
||||
* ...
|
||||
|
|
@ -0,0 +1,187 @@
|
|||
package docs.persistence
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.persistence.{ Recover, Persistent, Processor }
|
||||
import akka.testkit.{ ImplicitSender, AkkaSpec }
|
||||
|
||||
trait PersistenceDocSpec {
|
||||
val system: ActorSystem
|
||||
val config =
|
||||
"""
|
||||
//#config
|
||||
akka.persistence.journal.leveldb.dir = "target/journal"
|
||||
//#config
|
||||
"""
|
||||
|
||||
import system._
|
||||
|
||||
new AnyRef {
|
||||
//#definition
|
||||
import akka.persistence.{ Persistent, Processor }
|
||||
|
||||
class MyProcessor extends Processor {
|
||||
def receive = {
|
||||
case Persistent(payload, sequenceNr) ⇒ // message has been written to journal
|
||||
case other ⇒ // message has not been written to journal
|
||||
}
|
||||
}
|
||||
//#definition
|
||||
|
||||
//#usage
|
||||
import akka.actor.Props
|
||||
|
||||
val processor = actorOf(Props[MyProcessor], name = "myProcessor")
|
||||
|
||||
processor ! Persistent("foo") // will be journaled
|
||||
processor ! "bar" // will not be journaled
|
||||
//#usage
|
||||
|
||||
//#recover-explicit
|
||||
processor ! Recover()
|
||||
//#recover-explicit
|
||||
}
|
||||
|
||||
new AnyRef {
|
||||
trait MyProcessor1 extends Processor {
|
||||
//#recover-on-start-disabled
|
||||
override def preStartProcessor() = ()
|
||||
//#recover-on-start-disabled
|
||||
//#recover-on-restart-disabled
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) = ()
|
||||
//#recover-on-restart-disabled
|
||||
}
|
||||
|
||||
trait MyProcessor2 extends Processor {
|
||||
//#recover-on-start-custom
|
||||
override def preStartProcessor() {
|
||||
self ! Recover(toSequenceNr = 457L)
|
||||
}
|
||||
//#recover-on-start-custom
|
||||
}
|
||||
|
||||
trait MyProcessor3 extends Processor {
|
||||
//#deletion
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) {
|
||||
message match {
|
||||
case Some(p: Persistent) ⇒ delete(p)
|
||||
case _ ⇒
|
||||
}
|
||||
super.preRestartProcessor(reason, message)
|
||||
}
|
||||
//#deletion
|
||||
}
|
||||
}
|
||||
|
||||
new AnyRef {
|
||||
trait ProcessorMethods {
|
||||
//#processor-id
|
||||
def processorId: String
|
||||
//#processor-id
|
||||
//#recovery-status
|
||||
def recoveryRunning: Boolean
|
||||
def recoveryFinished: Boolean
|
||||
//#recovery-status
|
||||
//#current-message
|
||||
implicit def currentPersistentMessage: Option[Persistent]
|
||||
//#current-message
|
||||
}
|
||||
class MyProcessor1 extends Processor with ProcessorMethods {
|
||||
//#processor-id-override
|
||||
override def processorId = "my-stable-processor-id"
|
||||
//#processor-id-override
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
new AnyRef {
|
||||
//#channel-example
|
||||
import akka.actor.{ Actor, Props }
|
||||
import akka.persistence.{ Channel, Deliver, Persistent, Processor }
|
||||
|
||||
class MyProcessor extends Processor {
|
||||
val destination = context.actorOf(Props[MyDestination])
|
||||
val channel = context.actorOf(Channel.props(), name = "myChannel")
|
||||
|
||||
def receive = {
|
||||
case p @ Persistent(payload, _) ⇒ {
|
||||
channel ! Deliver(p.withPayload(s"processed ${payload}"), destination)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MyDestination extends Actor {
|
||||
def receive = {
|
||||
case p @ Persistent(payload, _) ⇒ {
|
||||
println(s"received ${payload}")
|
||||
p.confirm()
|
||||
}
|
||||
}
|
||||
}
|
||||
//#channel-example
|
||||
|
||||
class MyProcessor2 extends Processor {
|
||||
import akka.persistence.Resolve
|
||||
|
||||
val destination = context.actorOf(Props[MyDestination])
|
||||
val channel =
|
||||
//#channel-id-override
|
||||
context.actorOf(Channel.props("my-stable-channel-id"))
|
||||
//#channel-id-override
|
||||
|
||||
def receive = {
|
||||
case p @ Persistent(payload, _) ⇒ {
|
||||
//#channel-example-reply
|
||||
channel ! Deliver(p.withPayload(s"processed ${payload}"), sender)
|
||||
//#channel-example-reply
|
||||
//#resolve-destination
|
||||
channel ! Deliver(p, sender, Resolve.Destination)
|
||||
//#resolve-destination
|
||||
//#resolve-sender
|
||||
channel forward Deliver(p, destination, Resolve.Sender)
|
||||
//#resolve-sender
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MyProcessor3 extends Processor {
|
||||
def receive = {
|
||||
//#payload-pattern-matching
|
||||
case Persistent(payload, _) ⇒
|
||||
//#payload-pattern-matching
|
||||
}
|
||||
}
|
||||
|
||||
class MyProcessor4 extends Processor {
|
||||
def receive = {
|
||||
//#sequence-nr-pattern-matching
|
||||
case Persistent(_, sequenceNr) ⇒
|
||||
//#sequence-nr-pattern-matching
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
new AnyRef {
|
||||
//#fsm-example
|
||||
import akka.actor.FSM
|
||||
import akka.persistence.{ Processor, Persistent }
|
||||
|
||||
class PersistentDoor extends Processor with FSM[String, Int] {
|
||||
startWith("closed", 0)
|
||||
|
||||
when("closed") {
|
||||
case Event(Persistent("open", _), counter) ⇒ {
|
||||
goto("open") using (counter + 1) replying (counter)
|
||||
}
|
||||
}
|
||||
|
||||
when("open") {
|
||||
case Event(Persistent("close", _), counter) ⇒ {
|
||||
goto("closed") using (counter + 1) replying (counter)
|
||||
}
|
||||
}
|
||||
}
|
||||
//#fsm-example
|
||||
}
|
||||
}
|
||||
|
|
@ -12,4 +12,5 @@ Actors
|
|||
mailboxes
|
||||
routing
|
||||
fsm
|
||||
persistence
|
||||
testing
|
||||
|
|
|
|||
252
akka-docs/rst/scala/persistence.rst
Normal file
252
akka-docs/rst/scala/persistence.rst
Normal file
|
|
@ -0,0 +1,252 @@
|
|||
.. _persistence:
|
||||
|
||||
###########
|
||||
Persistence
|
||||
###########
|
||||
|
||||
This section describes an early access version of the Akka persistence module. Akka persistence is heavily inspired
|
||||
by the `eventsourced`_ library. It follows the same concepts and architecture of `eventsourced`_ but significantly
|
||||
differs on API and implementation level.
|
||||
|
||||
.. warning::
|
||||
|
||||
This module is marked as **“experimental”** as of its introduction in Akka 2.3.0. We will continue to
|
||||
improve this API based on our users’ feedback, which implies that while we try to keep incompatible
|
||||
changes to a minimum the binary compatibility guarantee for maintenance releases does not apply to the
|
||||
contents of the ``akka.persistence`` package.
|
||||
|
||||
.. _eventsourced: https://github.com/eligosource/eventsourced
|
||||
|
||||
Dependencies
|
||||
============
|
||||
|
||||
Akka persistence is a separate jar file. Make sure that you have the following dependency in your project::
|
||||
|
||||
"com.typesafe.akka" %% "akka-persistence" % "@version@" @crossString@
|
||||
|
||||
Architecture
|
||||
============
|
||||
|
||||
* *Processor*: A processor is a persistent actor. Messages sent to a processor are written to a journal before
|
||||
its ``receive`` method is called. When a processor is started or restarted, journaled messages are replayed
|
||||
to that processor, so that it can recover internal state from these messages.
|
||||
|
||||
* *Channel*: Channels are used by processors to communicate with other actors. They prevent that replayed messages
|
||||
are redundantly delivered to these actors.
|
||||
|
||||
Use cases
|
||||
=========
|
||||
|
||||
* TODO: describe command sourcing
|
||||
* TODO: describe event sourcing
|
||||
|
||||
Configuration
|
||||
=============
|
||||
|
||||
By default, journaled messages are written to a directory named ``journal`` in the current working directory. This
|
||||
can be changed by configuration where the specified path can be relative or absolute:
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#config
|
||||
|
||||
.. _processors:
|
||||
|
||||
Processors
|
||||
==========
|
||||
|
||||
A processor can be implemented by extending the ``Processor`` trait and implementing the ``receive`` method.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#definition
|
||||
|
||||
Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted.
|
||||
When a processor's ``receive`` method is called with a ``Persistent`` message it can safely assume that this message
|
||||
has been successfully written to the journal. A ``Processor`` itself is an ``Actor`` and can therefore be instantiated
|
||||
with ``actorOf``.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#usage
|
||||
|
||||
Recovery
|
||||
--------
|
||||
|
||||
By default, a processor is automatically recovered on start and on restart by replaying journaled messages.
|
||||
New messages sent to a processor during recovery do not interfere with replayed messages. New messages will
|
||||
only be received by that processor after recovery completes.
|
||||
|
||||
Recovery customization
|
||||
^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Automated recovery on start can be disabled by overriding ``preStartProcessor`` with an empty implementation.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-disabled
|
||||
|
||||
In this case, a processor must be recovered explicitly by sending it a ``Recover()`` message.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-explicit
|
||||
|
||||
If not overridden, ``preStartProcessor`` sends a ``Recover()`` message to ``self``. Applications may also override
|
||||
``preStartProcessor`` to define further ``Recover()`` parameters such as an upper sequence number bound, for example.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-custom
|
||||
|
||||
Automated recovery on restart can be disabled by overriding ``preRestartProcessor`` with an empty implementation.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-restart-disabled
|
||||
|
||||
This is useful in situations where processors are *resumed* by a supervisor (which keeps accumulated internal
|
||||
state and makes a message replay unnecessary).
|
||||
|
||||
Recovery status
|
||||
^^^^^^^^^^^^^^^
|
||||
|
||||
A processor can query its own recovery status via the methods
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recovery-status
|
||||
|
||||
.. _failure-handling:
|
||||
|
||||
Failure handling
|
||||
^^^^^^^^^^^^^^^^
|
||||
|
||||
A persistent message that caused an exception will be received again by a processor after restart. To prevent
|
||||
a replay of that message during recovery it can be marked as deleted.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#deletion
|
||||
|
||||
Life cycle hooks
|
||||
----------------
|
||||
|
||||
``Processor`` implementation classes should override the ``preStartProcessor``, ``preRestartProcessor``,
|
||||
``postRestartProcessor`` and ``postStopProcessor`` life cycle hooks and not ``preStart``, ``preRestart``,
|
||||
``postRestart`` and ``postStop`` directly. The latter are nevertheless non-final to allow composition with
|
||||
existing traits such as ``akka.actor.FSM``, for example.
|
||||
|
||||
Identifiers
|
||||
-----------
|
||||
|
||||
A processor must have an identifier that doesn't change across different actor incarnations. It defaults to the
|
||||
``String`` representation of processor's path and can be obtained via the ``processorId`` method.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#processor-id
|
||||
|
||||
Applications can customize a processor's id by specifying an actor name during processor creation as shown in
|
||||
section :ref:`processors`. This works well when using local actor references but may cause problems with remote
|
||||
actor references because their paths also contain deployment information such as host and port (and actor deployments
|
||||
are likely to change during the lifetime of an application). In this case, ``Processor`` implementation classes
|
||||
should override ``processorId``.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#processor-id-override
|
||||
|
||||
Later versions of the Akka persistence module will likely offer a possibility to migrate processor ids.
|
||||
|
||||
Channels
|
||||
========
|
||||
|
||||
Channels are special actors that are used by processors to communicate with other actors (channel destinations).
|
||||
Channels prevent redundant delivery of replayed messages to destinations during processor recovery. A replayed
|
||||
message is retained by a channel if its previous delivery has been confirmed by a destination.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-example
|
||||
|
||||
A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver``
|
||||
request instructs a channel to send a ``Persistent`` message to a destination where the sender of the ``Deliver``
|
||||
request is forwarded to the destination. A processor may also reply to a message sender directly by using ``sender``
|
||||
as channel destination.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-example-reply
|
||||
|
||||
Channel destinations confirm the delivery of a ``Persistent`` message by calling its ``confirm()`` method. This
|
||||
(asynchronously) writes a confirmation entry to the journal. Replayed messages internally contain these confirmation
|
||||
entries which allows a channel to decide if a message should be retained or not.
|
||||
|
||||
If an application crashes after a destination called ``confirm()`` but before the confirmation entry could have
|
||||
been written to the journal then the unconfirmed message will be delivered again during next recovery and it is
|
||||
the destination's responsibility to detect the duplicate or simply process the message again if it's an idempotent
|
||||
receiver. Duplicates can be detected, for example, by tracking sequence numbers.
|
||||
|
||||
Currently, channels do not store ``Deliver`` requests or retry delivery on network or destination failures. This
|
||||
feature (*reliable channels*) will be available soon.
|
||||
|
||||
Sender resolution
|
||||
-----------------
|
||||
|
||||
``ActorRef`` s of ``Persistent`` message senders are also stored in the journal. Consequently, they may become invalid if
|
||||
an application is restarted and messages are replayed. For example, the stored ``ActorRef`` may then reference
|
||||
a previous incarnation of a sender and a new incarnation of that sender cannot receive a reply from a processor.
|
||||
This may be acceptable for many applications but others may require that a new sender incarnation receives the
|
||||
reply (to reliably resume a conversation between actors after a JVM crash, for example). Here, a channel may
|
||||
assist in resolving new sender incarnations by specifying a third ``Deliver`` argument:
|
||||
|
||||
* ``Resolve.Destination`` if the sender of a persistent message is used as channel destination
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#resolve-destination
|
||||
|
||||
* ``Resolve.Sender`` if the sender of a persistent message is forwarded to a destination.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#resolve-sender
|
||||
|
||||
Default is ``Resolve.Off`` which means no resolution. Find out more in the ``Deliver`` API docs.
|
||||
|
||||
Identifiers
|
||||
-----------
|
||||
|
||||
In the same way as :ref:`processors`, channels also have an identifier that defaults to a channel's path. A channel
|
||||
identifier can therefore be customized by using a custom actor name at channel creation. As already mentioned, this
|
||||
works well when using local actor references but may cause problems with remote actor references. In this case, an
|
||||
application-defined channel id should be provided as argument to ``Channel.props(String)``
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-id-override
|
||||
|
||||
Persistent messages
|
||||
===================
|
||||
|
||||
Payload
|
||||
-------
|
||||
|
||||
The payload of a ``Persistent`` message can be obtained via its
|
||||
|
||||
.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/Persistent.scala#payload
|
||||
|
||||
method or by pattern matching
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#payload-pattern-matching
|
||||
|
||||
Inside processors, new persistent messages are derived from the current persistent message before sending them via a
|
||||
channel, either by calling ``p.withPayload(...)`` or ``Persistent.create(...)`` where the latter uses the
|
||||
implicit ``currentPersistentMessage`` made available by ``Processor``.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#current-message
|
||||
|
||||
This is necessary for delivery confirmations to work properly. Both ways are equivalent but we recommend
|
||||
using ``p.withPayload(...)`` for clarity.
|
||||
|
||||
Sequence number
|
||||
---------------
|
||||
|
||||
The sequence number of a ``Persistent`` message can be obtained via its
|
||||
|
||||
.. includecode:: ../../../akka-persistence/src/main/scala/akka/persistence/Persistent.scala#sequence-nr
|
||||
|
||||
method or by pattern matching
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#sequence-nr-pattern-matching
|
||||
|
||||
Persistent messages are assigned sequence numbers on a per-processor basis. A sequence starts at ``1L`` and
|
||||
doesn't contain gaps unless a processor marks a message as deleted.
|
||||
|
||||
Miscellaneous
|
||||
=============
|
||||
|
||||
State machines
|
||||
--------------
|
||||
|
||||
State machines can be persisted by mixing in the ``FSM`` trait into processors.
|
||||
|
||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#fsm-example
|
||||
|
||||
Upcoming features
|
||||
=================
|
||||
|
||||
* Snapshot based recovery
|
||||
* Configurable serialization
|
||||
* Reliable channels
|
||||
* Journal plugin API
|
||||
* ...
|
||||
19
akka-persistence/src/main/resources/reference.conf
Normal file
19
akka-persistence/src/main/resources/reference.conf
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
akka {
|
||||
persistence {
|
||||
journal {
|
||||
use = "leveldb"
|
||||
|
||||
inmem {
|
||||
// ...
|
||||
}
|
||||
|
||||
leveldb {
|
||||
dir = "journal"
|
||||
dispatcher {
|
||||
executor = "thread-pool-executor"
|
||||
type = PinnedDispatcher
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
271
akka-persistence/src/main/scala/akka/persistence/Channel.scala
Normal file
271
akka-persistence/src/main/scala/akka/persistence/Channel.scala
Normal file
|
|
@ -0,0 +1,271 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.persistence
|
||||
|
||||
import akka.actor._
|
||||
|
||||
/**
|
||||
* A channel is used by [[Processor]]s for sending received persistent messages to destinations.
|
||||
* It prevents redundant delivery of messages to these destinations when a processor is recovered
|
||||
* i.e. receives replayed messages. This requires that channel destinations confirm the receipt of
|
||||
* persistent messages by calling `confirm()` on the [[Persistent]] message.
|
||||
*
|
||||
* A channel can be instructed to deliver a persistent message to a `destination` via the [[Deliver]]
|
||||
* command.
|
||||
*
|
||||
* {{{
|
||||
* class ForwardExample extends Processor {
|
||||
* val destination = context.actorOf(Props[MyDestination])
|
||||
* val channel = context.actorOf(Channel.props(), "myChannel")
|
||||
*
|
||||
* def receive = {
|
||||
* case m @ Persistent(payload, _) => {
|
||||
* // forward modified message to destination
|
||||
* channel forward Deliver(m.withPayload(s"fw: ${payload}"), destination)
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* }}}
|
||||
*
|
||||
* To reply to the sender of a persistent message, the `sender` reference should be used as channel
|
||||
* destination.
|
||||
*
|
||||
* {{{
|
||||
* class ReplyExample extends Processor {
|
||||
* val channel = context.actorOf(Channel.props(), "myChannel")
|
||||
*
|
||||
* def receive = {
|
||||
* case m @ Persistent(payload, _) => {
|
||||
* // reply modified message to sender
|
||||
* channel ! Deliver(m.withPayload(s"re: ${payload}"), sender)
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* }}}
|
||||
*
|
||||
* @see [[Deliver]]
|
||||
*/
|
||||
class Channel private (_channelId: Option[String]) extends Actor with Stash {
|
||||
private val extension = Persistence(context.system)
|
||||
private val id = _channelId match {
|
||||
case Some(cid) ⇒ cid
|
||||
case None ⇒ extension.channelId(self)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new channel with a generated channel id.
|
||||
*/
|
||||
def this() = this(None)
|
||||
|
||||
/**
|
||||
* Creates a new channel with specified channel id.
|
||||
*
|
||||
* @param channelId channel id.
|
||||
*/
|
||||
def this(channelId: String) = this(Some(channelId))
|
||||
|
||||
import ResolvedDelivery._
|
||||
|
||||
private val delivering: Actor.Receive = {
|
||||
case Deliver(p: PersistentImpl, destination, resolve) ⇒ {
|
||||
if (!p.confirms.contains(id)) {
|
||||
val msg = p.copy(channelId = id,
|
||||
confirmTarget = extension.journalFor(p.processorId),
|
||||
confirmMessage = Confirm(p.processorId, p.sequenceNr, id))
|
||||
resolve match {
|
||||
case Resolve.Sender if !p.resolved ⇒ {
|
||||
context.actorOf(Props(classOf[ResolvedSenderDelivery], msg, destination, sender)) ! DeliverResolved
|
||||
context.become(buffering, false)
|
||||
}
|
||||
case Resolve.Destination if !p.resolved ⇒ {
|
||||
context.actorOf(Props(classOf[ResolvedDestinationDelivery], msg, destination, sender)) ! DeliverResolved
|
||||
context.become(buffering, false)
|
||||
}
|
||||
case _ ⇒ destination tell (msg, sender)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val buffering: Actor.Receive = {
|
||||
case DeliveredResolved | DeliveredUnresolved ⇒ context.unbecome(); unstashAll() // TODO: optimize
|
||||
case _: Deliver ⇒ stash()
|
||||
}
|
||||
|
||||
def receive = delivering
|
||||
}
|
||||
|
||||
object Channel {
|
||||
/**
|
||||
* Returns a channel configuration object for creating a [[Channel]] with a
|
||||
* generated id.
|
||||
*/
|
||||
def props(): Props = Props(classOf[Channel])
|
||||
|
||||
/**
|
||||
* Returns a channel configuration object for creating a [[Channel]] with the
|
||||
* specified id.
|
||||
*
|
||||
* @param channelId channel id.
|
||||
*/
|
||||
def props(channelId: String): Props = Props(classOf[Channel], channelId)
|
||||
}
|
||||
|
||||
/**
|
||||
* Instructs a [[Channel]] to deliver `persistent` message to destination `destination`.
|
||||
* The `resolve` parameter can be:
|
||||
*
|
||||
* - `Resolve.Destination`: will resolve a new destination reference from the specified
|
||||
* `destination`s path. The `persistent` message will be sent to the newly resolved
|
||||
* destination.
|
||||
* - `Resolve.Sender`: will resolve a new sender reference from this `Deliver` message's
|
||||
* `sender` path. The `persistent` message will be sent to the specified `destination`
|
||||
* using the newly resolved sender.
|
||||
* - `Resolve.Off`: will not do any resolution (default).
|
||||
*
|
||||
* Resolving an actor reference means first obtaining an `ActorSelection` from the path of
|
||||
* the reference to be resolved and then obtaining a new actor reference via an `Identify`
|
||||
* - `ActorIdentity` conversation. Actor reference resolution does not change the original
|
||||
* order of messages.
|
||||
*
|
||||
* Resolving actor references may become necessary when using the stored sender references
|
||||
* of replayed messages. A stored sender reference may become invalid (for example, it may
|
||||
* reference a previous sender incarnation, after a JVM restart). Depending on how a processor
|
||||
* uses sender references, two resolution strategies are relevant.
|
||||
*
|
||||
* - `Resolve.Sender` when a processor forwards a replayed message to a destination.
|
||||
*
|
||||
* {{{
|
||||
* channel forward Deliver(message, destination, Resolve.Sender)
|
||||
* }}}
|
||||
*
|
||||
* - `Resolve.Destination` when a processor replies to the sender of a replayed message. In
|
||||
* this case the sender is used as channel destination.
|
||||
*
|
||||
* {{{
|
||||
* channel ! Deliver(message, sender, Resolve.Destination)
|
||||
* }}}
|
||||
*
|
||||
* A destination or sender reference will only be resolved by a channel if
|
||||
*
|
||||
* - the `resolve` parameter is set to `Resolve.Destination` or `Resolve.Channel`
|
||||
* - the message is replayed
|
||||
* - the message is not retained by the channel and
|
||||
* - there was no previous successful resolve action for that message
|
||||
*
|
||||
* @param persistent persistent message.
|
||||
* @param destination persistent message destination.
|
||||
* @param resolve resolve strategy.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
case class Deliver(persistent: Persistent, destination: ActorRef, resolve: Resolve.ResolveStrategy = Resolve.Off)
|
||||
|
||||
object Deliver {
|
||||
/**
|
||||
* Java API.
|
||||
*/
|
||||
def create(persistent: Persistent, destination: ActorRef) = Deliver(persistent, destination)
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
*/
|
||||
def create(persistent: Persistent, destination: ActorRef, resolve: Resolve.ResolveStrategy) = Deliver(persistent, destination, resolve)
|
||||
}
|
||||
|
||||
/**
|
||||
* Actor reference resolution strategy.
|
||||
*
|
||||
* @see [[Deliver]]
|
||||
*/
|
||||
object Resolve {
|
||||
sealed abstract class ResolveStrategy
|
||||
|
||||
/**
|
||||
* No resolution.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
case object Off extends ResolveStrategy
|
||||
|
||||
/**
|
||||
* [[Channel]] should resolve the `sender` of a [[Deliver]] message.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
case object Sender extends ResolveStrategy
|
||||
|
||||
/**
|
||||
* [[Channel]] should resolve the `destination` of a [[Deliver]] message.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
case object Destination extends ResolveStrategy
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
*/
|
||||
def off() = Off
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
*/
|
||||
def sender() = Sender
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
*/
|
||||
def destination() = Destination
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolved delivery support.
|
||||
*/
|
||||
private trait ResolvedDelivery extends Actor {
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
import ResolvedDelivery._
|
||||
|
||||
context.setReceiveTimeout(5 seconds) // TODO: make configurable
|
||||
|
||||
def path: ActorPath
|
||||
def onResolveSuccess(ref: ActorRef): Unit
|
||||
def onResolveFailure(): Unit
|
||||
|
||||
def receive = {
|
||||
case DeliverResolved ⇒ context.actorSelection(path) ! Identify(1)
|
||||
case ActorIdentity(1, Some(ref)) ⇒ onResolveSuccess(ref); shutdown(DeliveredResolved)
|
||||
case ActorIdentity(1, None) ⇒ onResolveFailure(); shutdown(DeliveredUnresolved)
|
||||
case ReceiveTimeout ⇒ onResolveFailure(); shutdown(DeliveredUnresolved)
|
||||
}
|
||||
|
||||
def shutdown(message: Any) {
|
||||
context.parent ! message
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
private object ResolvedDelivery {
|
||||
case object DeliverResolved
|
||||
case object DeliveredResolved
|
||||
case object DeliveredUnresolved
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves `destination` before sending `persistent` message to the resolved destination using
|
||||
* the specified sender (`sdr`) as message sender.
|
||||
*/
|
||||
private class ResolvedDestinationDelivery(persistent: PersistentImpl, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery {
|
||||
val path = destination.path
|
||||
def onResolveSuccess(ref: ActorRef) = ref tell (persistent.copy(resolved = true), sdr)
|
||||
def onResolveFailure() = destination tell (persistent, sdr)
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves `sdr` before sending `persistent` message to specified `destination` using
|
||||
* the resolved sender as message sender.
|
||||
*/
|
||||
private class ResolvedSenderDelivery(persistent: PersistentImpl, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery {
|
||||
val path = sdr.path
|
||||
def onResolveSuccess(ref: ActorRef) = destination tell (persistent.copy(resolved = true), ref)
|
||||
def onResolveFailure() = destination tell (persistent, sdr)
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.persistence
|
||||
|
||||
import akka.actor._
|
||||
|
||||
private[persistence] trait JournalFactory {
|
||||
/**
|
||||
*
|
||||
* Creates a new journal actor.
|
||||
*/
|
||||
def createJournal(implicit factory: ActorRefFactory): ActorRef
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines messages exchanged between processors, channels and a journal.
|
||||
*/
|
||||
private[persistence] object Journal {
|
||||
/**
|
||||
* Instructs a journal to mark the `persistent` message as deleted.
|
||||
* A persistent message marked as deleted is not replayed during recovery.
|
||||
*
|
||||
* @param persistent persistent message.
|
||||
*/
|
||||
case class Delete(persistent: Persistent)
|
||||
|
||||
/**
|
||||
* Instructs a journal to persist a message.
|
||||
*
|
||||
* @param persistent message to be persisted.
|
||||
* @param processor requesting processor.
|
||||
*/
|
||||
case class Write(persistent: PersistentImpl, processor: ActorRef)
|
||||
|
||||
/**
|
||||
* Reply message to a processor that `persistent` message has been journaled.
|
||||
*
|
||||
* @param persistent persistent message.
|
||||
*/
|
||||
case class Written(persistent: PersistentImpl)
|
||||
|
||||
/**
|
||||
* Instructs a journal to loop a `message` back to `processor`, without persisting the
|
||||
* message. Looping of messages through a journal is required to preserve message order
|
||||
* with persistent messages.
|
||||
*
|
||||
* @param message message to be looped through the journal.
|
||||
* @param processor requesting processor.
|
||||
*/
|
||||
case class Loop(message: Any, processor: ActorRef)
|
||||
|
||||
/**
|
||||
* Reply message to a processor that a `message` has been looped through the journal.
|
||||
*
|
||||
* @param message looped message.
|
||||
*/
|
||||
case class Looped(message: Any)
|
||||
|
||||
/**
|
||||
* Instructs a journal to replay persistent messages to `processor`, identified by
|
||||
* `processorId`. Messages are replayed up to sequence number `toSequenceNr` (inclusive).
|
||||
*
|
||||
* @param toSequenceNr upper sequence number bound (inclusive) for replay.
|
||||
* @param processor processor that receives the replayed messages.
|
||||
* @param processorId processor id.
|
||||
*/
|
||||
case class Replay(toSequenceNr: Long, processor: ActorRef, processorId: String)
|
||||
|
||||
/**
|
||||
* Wrapper for a replayed `persistent` message.
|
||||
*
|
||||
* @param persistent persistent message.
|
||||
*/
|
||||
case class Replayed(persistent: PersistentImpl)
|
||||
|
||||
/**
|
||||
* Message sent to a processor after the last [[Replayed]] message.
|
||||
*
|
||||
* @param maxSequenceNr the highest stored sequence number (for a processor).
|
||||
*/
|
||||
case class RecoveryEnd(maxSequenceNr: Long)
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.persistence
|
||||
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import akka.actor._
|
||||
import akka.persistence.journal._
|
||||
|
||||
/**
|
||||
* Akka persistence extension.
|
||||
*/
|
||||
object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
|
||||
class Settings(config: Config) {
|
||||
val rootConfig = config.getConfig("akka.persistence.journal")
|
||||
val journalName = rootConfig.getString("use")
|
||||
val journalConfig = rootConfig.getConfig(journalName)
|
||||
val journalFactory = journalName match {
|
||||
case "inmem" ⇒ new InmemJournalSettings(journalConfig)
|
||||
case "leveldb" ⇒ new LeveldbJournalSettings(journalConfig)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
*/
|
||||
override def get(system: ActorSystem): Persistence = super.get(system)
|
||||
|
||||
def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system)
|
||||
|
||||
def lookup() = Persistence
|
||||
}
|
||||
|
||||
/**
|
||||
* Akka persistence extension.
|
||||
*/
|
||||
class Persistence(val system: ExtendedActorSystem) extends Extension {
|
||||
private val settings = new Persistence.Settings(system.settings.config)
|
||||
private val journal = settings.journalFactory.createJournal(system)
|
||||
// TODO: journal should have its own dispatcher
|
||||
|
||||
/**
|
||||
* Returns a journal for processor identified by `pid`.
|
||||
*
|
||||
* @param processorId processor id.
|
||||
*/
|
||||
def journalFor(processorId: String): ActorRef = {
|
||||
// Currently returns a journal singleton is returned but this methods allows
|
||||
// for later optimisations where each processor can have its own journal actor.
|
||||
journal
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a canonical processor id from a processor actor ref.
|
||||
*/
|
||||
def processorId(processor: ActorRef): String = id(processor)
|
||||
|
||||
/**
|
||||
* Creates a canonical channel id from a channel actor ref.
|
||||
*/
|
||||
def channelId(channel: ActorRef): String = id(channel)
|
||||
|
||||
private def id(ref: ActorRef) = ref.path.toStringWithAddress(system.provider.getDefaultAddress)
|
||||
}
|
||||
|
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.persistence
|
||||
|
||||
import akka.actor.ActorRef
|
||||
|
||||
/**
|
||||
* Persistent message.
|
||||
*/
|
||||
sealed abstract class Persistent {
|
||||
/**
|
||||
* This persistent message's payload.
|
||||
*/
|
||||
//#payload
|
||||
def payload: Any
|
||||
//#payload
|
||||
|
||||
/**
|
||||
* This persistent message's sequence number.
|
||||
*/
|
||||
//#sequence-nr
|
||||
def sequenceNr: Long
|
||||
//#sequence-nr
|
||||
|
||||
/**
|
||||
* Creates a new persistent message with the specified `payload`.
|
||||
*/
|
||||
def withPayload(payload: Any): Persistent
|
||||
|
||||
/**
|
||||
* Called by [[Channel]] destinations to confirm the receipt of a persistent message.
|
||||
*/
|
||||
def confirm(): Unit
|
||||
}
|
||||
|
||||
object Persistent {
|
||||
/**
|
||||
* Java API.
|
||||
*
|
||||
* Creates a new persistent message. Must only be used outside processors.
|
||||
*
|
||||
* @param payload payload of new persistent message.
|
||||
*/
|
||||
def create(payload: Any): Persistent =
|
||||
create(payload, null)
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
*
|
||||
* Creates a new persistent message, derived from the specified current message. The current
|
||||
* message can be obtained inside a [[Processor]] by calling `getCurrentPersistentMessage()`.
|
||||
*
|
||||
* @param payload payload of new persistent message.
|
||||
* @param currentPersistentMessage current persistent message.
|
||||
*/
|
||||
def create(payload: Any, currentPersistentMessage: Persistent): Persistent =
|
||||
apply(payload)(Option(currentPersistentMessage))
|
||||
|
||||
/**
|
||||
* Creates a new persistent message, derived from an implicit current message.
|
||||
* When used inside a [[Processor]], this is the optional current [[Message]]
|
||||
* of that processor.
|
||||
*
|
||||
* @param payload payload of the new persistent message.
|
||||
* @param currentPersistentMessage optional current persistent message, defaults to `None`.
|
||||
*/
|
||||
def apply(payload: Any)(implicit currentPersistentMessage: Option[Persistent] = None): Persistent =
|
||||
currentPersistentMessage.map(_.withPayload(payload)).getOrElse(PersistentImpl(payload))
|
||||
|
||||
/**
|
||||
* Persistent message extractor.
|
||||
*/
|
||||
def unapply(persistent: Persistent): Option[(Any, Long)] =
|
||||
Some((persistent.payload, persistent.sequenceNr))
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*
|
||||
* Internal [[Persistent]] representation.
|
||||
*/
|
||||
private[persistence] case class PersistentImpl(
|
||||
payload: Any,
|
||||
sequenceNr: Long = 0L,
|
||||
resolved: Boolean = true,
|
||||
processorId: String = "",
|
||||
channelId: String = "",
|
||||
sender: String = "",
|
||||
confirms: Seq[String] = Nil,
|
||||
confirmTarget: ActorRef = null,
|
||||
confirmMessage: Confirm = null) extends Persistent {
|
||||
|
||||
def withPayload(payload: Any): Persistent = copy(payload = payload)
|
||||
def confirm(): Unit = if (confirmTarget != null) confirmTarget ! confirmMessage
|
||||
}
|
||||
|
||||
/**
|
||||
* Message to confirm the receipt of a persistent message (sent via a [[Channel]]).
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
private[persistence] case class Confirm(processorId: String, sequenceNr: Long, channelId: String)
|
||||
415
akka-persistence/src/main/scala/akka/persistence/Processor.scala
Normal file
415
akka-persistence/src/main/scala/akka/persistence/Processor.scala
Normal file
|
|
@ -0,0 +1,415 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.persistence
|
||||
|
||||
import akka.actor._
|
||||
import akka.dispatch._
|
||||
|
||||
/**
|
||||
* An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted.
|
||||
*
|
||||
* {{{
|
||||
* import akka.persistence.{ Persistent, Processor }
|
||||
*
|
||||
* class MyProcessor extends Processor {
|
||||
* def receive = {
|
||||
* case Persistent(payload, sequenceNr) => // message has been written to journal
|
||||
* case other => // message has not been written to journal
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* val processor = actorOf(Props[MyProcessor], name = "myProcessor")
|
||||
*
|
||||
* processor ! Persistent("foo")
|
||||
* processor ! "bar"
|
||||
* }}}
|
||||
*
|
||||
*
|
||||
* During start and restart, persistent messages are replayed to a processor so that it can recover internal
|
||||
* state from these messages. New messages sent to a processor during recovery do not interfere with replayed
|
||||
* messages, hence applications don't need to wait for a processor to complete its recovery.
|
||||
*
|
||||
* Automated recovery can be turned off or customized by overriding the [[preStartProcessor]] and
|
||||
* [[preRestartProcessor]] life cycle hooks. If automated recovery is turned off, an application can
|
||||
* explicitly recover a processor by sending it a [[Recover]] message.
|
||||
*
|
||||
* [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence
|
||||
* starts at `1L` and doesn't contain gaps unless a processor (logically) [[delete]]s a message.
|
||||
*
|
||||
* During recovery, a processor internally buffers new messages until recovery completes, so that new messages
|
||||
* do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the
|
||||
* ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the
|
||||
* ''user stash'' for stashing/unstashing both persistent and transient messages.
|
||||
*
|
||||
* @see [[UntypedProcessor]]
|
||||
*/
|
||||
trait Processor extends Actor with Stash {
|
||||
import Journal._
|
||||
|
||||
private val extension = Persistence(context.system)
|
||||
private val _processorId = extension.processorId(self)
|
||||
|
||||
/**
|
||||
* Processor state.
|
||||
*/
|
||||
private trait State {
|
||||
/**
|
||||
* State-specific message handler.
|
||||
*/
|
||||
def aroundReceive(receive: Actor.Receive, message: Any): Unit
|
||||
|
||||
protected def process(receive: Actor.Receive, message: Any) =
|
||||
receive.applyOrElse(message, unhandled)
|
||||
|
||||
protected def processPersistent(receive: Actor.Receive, persistent: Persistent) = try {
|
||||
_currentPersistent = persistent
|
||||
updateLastSequenceNr(persistent)
|
||||
receive.applyOrElse(persistent, unhandled)
|
||||
} finally _currentPersistent = null
|
||||
|
||||
protected def updateLastSequenceNr(persistent: Persistent) {
|
||||
if (persistent.sequenceNr > _lastSequenceNr) _lastSequenceNr = persistent.sequenceNr
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initial state, waits for `Recover` request, then changes to `recoveryStarted`.
|
||||
*/
|
||||
private val recoveryPending = new State {
|
||||
override def toString: String = "recovery pending"
|
||||
|
||||
def aroundReceive(receive: Actor.Receive, message: Any): Unit = message match {
|
||||
case Recover(toSnr) ⇒ {
|
||||
_currentState = recoveryStarted
|
||||
journal ! Replay(toSnr, self, processorId)
|
||||
}
|
||||
case _ ⇒ stashInternal()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes replayed messages. Changes to `recoverySucceeded` if all replayed
|
||||
* messages have been successfully processed, otherwise, changes to `recoveryFailed`.
|
||||
* In case of a failure, the exception is caught and stored for being thrown later
|
||||
* in `prepareRestart`.
|
||||
*/
|
||||
private val recoveryStarted = new State {
|
||||
override def toString: String = "recovery started"
|
||||
|
||||
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
|
||||
case Replayed(p) ⇒ try { processPersistent(receive, p) } catch {
|
||||
case t: Throwable ⇒ {
|
||||
_currentState = recoveryFailed // delay throwing exception to prepareRestart
|
||||
_recoveryFailureReason = t
|
||||
_recoveryFailureMessage = currentEnvelope
|
||||
}
|
||||
}
|
||||
case RecoveryEnd(maxSnr) ⇒ {
|
||||
_currentState = recoverySucceeded
|
||||
_sequenceNr = maxSnr
|
||||
unstashAllInternal()
|
||||
}
|
||||
case Recover(_) ⇒ // ignore
|
||||
case _ ⇒ stashInternal()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Journals and processes new messages, both persistent and transient.
|
||||
*/
|
||||
private val recoverySucceeded = new State {
|
||||
override def toString: String = "recovery finished"
|
||||
|
||||
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
|
||||
case Recover(_) ⇒ // ignore
|
||||
case Replayed(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash
|
||||
case Written(p) ⇒ processPersistent(receive, p)
|
||||
case Looped(p) ⇒ process(receive, p)
|
||||
case p: PersistentImpl ⇒ journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self)
|
||||
case m ⇒ journal forward Loop(m, self)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Consumes remaining replayed messages and then changes to `prepareRestart`. The
|
||||
* message that caused the exception during replay, is re-added to the mailbox and
|
||||
* re-received in `prepareRestart`.
|
||||
*/
|
||||
private val recoveryFailed = new State {
|
||||
override def toString: String = "recovery failed"
|
||||
|
||||
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
|
||||
case RecoveryEnd(maxSnr) ⇒ {
|
||||
_currentState = prepareRestart
|
||||
mailbox.enqueueFirst(self, _recoveryFailureMessage)
|
||||
}
|
||||
case Replayed(p) ⇒ updateLastSequenceNr(p)
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-receives the replayed message that causes an exception during replay and throws
|
||||
* that exception.
|
||||
*/
|
||||
private val prepareRestart = new State {
|
||||
override def toString: String = "prepare restart"
|
||||
|
||||
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
|
||||
case Replayed(_) ⇒ throw _recoveryFailureReason
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
}
|
||||
|
||||
private var _sequenceNr: Long = 0L
|
||||
private var _lastSequenceNr: Long = 0L
|
||||
|
||||
private var _currentPersistent: Persistent = _
|
||||
private var _currentState: State = recoveryPending
|
||||
|
||||
private var _recoveryFailureReason: Throwable = _
|
||||
private var _recoveryFailureMessage: Envelope = _
|
||||
|
||||
private lazy val journal: ActorRef = extension.journalFor(processorId)
|
||||
|
||||
/**
|
||||
* Processor id. Defaults to this processor's path and can be overridden.
|
||||
*/
|
||||
def processorId: String = _processorId
|
||||
|
||||
/**
|
||||
* Highest received sequence number so far or `0L` if this processor hasn't received
|
||||
* a persistent message yet. Usually equal to the sequence number of `currentPersistentMessage`
|
||||
* (unless a processor implementation is about to re-order persistent messages using
|
||||
* `stash()` and `unstash()`).
|
||||
*/
|
||||
def lastSequenceNr: Long = _lastSequenceNr
|
||||
|
||||
/**
|
||||
* Returns `true` if this processor is currently recovering.
|
||||
*/
|
||||
def recoveryRunning: Boolean =
|
||||
_currentState == recoveryStarted ||
|
||||
_currentState == prepareRestart
|
||||
|
||||
/**
|
||||
* Returns `true` if this processor has successfully finished recovery.
|
||||
*/
|
||||
def recoveryFinished: Boolean =
|
||||
_currentState == recoverySucceeded
|
||||
|
||||
/**
|
||||
* Returns the current persistent message if there is one.
|
||||
*/
|
||||
implicit def currentPersistentMessage: Option[Persistent] = Option(_currentPersistent)
|
||||
|
||||
/**
|
||||
* Marks the `persistent` message as deleted. A message marked as deleted is not replayed during
|
||||
* recovery. This method is usually called inside `preRestartProcessor` when a persistent message
|
||||
* caused an exception. Processors that want to re-receive that persistent message during recovery
|
||||
* should not call this method.
|
||||
*/
|
||||
def delete(persistent: Persistent) {
|
||||
journal ! Delete(persistent)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
*/
|
||||
final override protected[akka] def aroundReceive(receive: Actor.Receive, message: Any): Unit = {
|
||||
_currentState.aroundReceive(receive, message)
|
||||
}
|
||||
|
||||
private def nextSequenceNr(): Long = {
|
||||
_sequenceNr += 1L
|
||||
_sequenceNr
|
||||
}
|
||||
|
||||
/**
|
||||
* User-overridable callback. Called when a processor is started. Default implementation sends
|
||||
* a `Recover()` to `self`.
|
||||
*/
|
||||
@throws(classOf[Exception])
|
||||
def preStartProcessor(): Unit = {
|
||||
self ! Recover()
|
||||
}
|
||||
|
||||
/**
|
||||
* User-overridable callback. Called when a processor is stopped. Empty default implementation.
|
||||
*/
|
||||
@throws(classOf[Exception])
|
||||
def postStopProcessor(): Unit = ()
|
||||
|
||||
/**
|
||||
* User-overridable callback. Called before a processor is restarted. Default implementation sends
|
||||
* a `Recover(lastSequenceNr)` message to `self` if `message` is defined, `Recover() otherwise`.
|
||||
*/
|
||||
@throws(classOf[Exception])
|
||||
def preRestartProcessor(reason: Throwable, message: Option[Any]): Unit = message match {
|
||||
case Some(_) ⇒ self ! Recover(lastSequenceNr)
|
||||
case None ⇒ self ! Recover()
|
||||
}
|
||||
|
||||
/**
|
||||
* User-overridable callback. Called after a processor has been restarted. Empty default implementation.
|
||||
*/
|
||||
@throws(classOf[Exception])
|
||||
def postRestartProcessor(reason: Throwable): Unit = ()
|
||||
|
||||
/**
|
||||
* Calls [[preStartProcessor]].
|
||||
*/
|
||||
override def preStart() {
|
||||
preStartProcessor()
|
||||
super.preStart()
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls [[postStopProcessor]] and unstashes all messages from the ''user stash'' that cannot be
|
||||
* replayed. The user stash is empty afterwards.
|
||||
*/
|
||||
override def postStop() {
|
||||
postStopProcessor()
|
||||
try unstashAll(unstashFilterPredicate) finally super.postStop()
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls [[preRestartDefault]] and then `super.preRestart()`. If processor implementation
|
||||
* classes want to opt out from stopping child actors, they should override this method and
|
||||
* call [[preRestartDefault]] only.
|
||||
*/
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
try preRestartDefault(reason, message) finally super.preRestart(reason, message)
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls [[preRestartProcessor]] and unstashes all messages from the ''user stash'' that cannot be
|
||||
* replayed. The user stash is empty afterwards.
|
||||
*/
|
||||
protected def preRestartDefault(reason: Throwable, message: Option[Any]) {
|
||||
message match {
|
||||
case Some(Written(m)) ⇒ preRestartProcessor(reason, Some(m))
|
||||
case Some(Looped(m)) ⇒ preRestartProcessor(reason, Some(m))
|
||||
case Some(Replayed(m)) ⇒ preRestartProcessor(reason, Some(m))
|
||||
case mo ⇒ preRestartProcessor(reason, None)
|
||||
}
|
||||
|
||||
unstashAll(unstashFilterPredicate)
|
||||
unstashAllInternal()
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls [[postRestartProcessor]].
|
||||
*/
|
||||
override def postRestart(reason: Throwable) {
|
||||
postRestartProcessor(reason)
|
||||
super.postRestart(reason)
|
||||
}
|
||||
|
||||
// -----------------------------------------------------
|
||||
// Processor-internal stash
|
||||
// -----------------------------------------------------
|
||||
|
||||
private def unstashFilterPredicate: Any ⇒ Boolean = {
|
||||
case _: Written ⇒ false
|
||||
case _: Replayed ⇒ false
|
||||
case _ ⇒ true
|
||||
}
|
||||
|
||||
private var processorStash = Vector.empty[Envelope]
|
||||
|
||||
private def stashInternal(): Unit = {
|
||||
processorStash :+= currentEnvelope
|
||||
}
|
||||
|
||||
private def unstashAllInternal(): Unit = try {
|
||||
val i = processorStash.reverseIterator
|
||||
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
|
||||
} finally {
|
||||
processorStash = Vector.empty[Envelope]
|
||||
}
|
||||
|
||||
private def currentEnvelope: Envelope =
|
||||
context.asInstanceOf[ActorCell].currentMessage
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
*
|
||||
* An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted.
|
||||
*
|
||||
* {{{
|
||||
* import akka.persistence.Persistent;
|
||||
* import akka.persistence.Processor;
|
||||
*
|
||||
* class MyProcessor extends UntypedProcessor {
|
||||
* public void onReceive(Object message) throws Exception {
|
||||
* if (message instanceof Persistent) {
|
||||
* // message has been written to journal
|
||||
* Persistent persistent = (Persistent)message;
|
||||
* Object payload = persistent.payload();
|
||||
* Long sequenceNr = persistent.sequenceNr();
|
||||
* // ...
|
||||
* } else {
|
||||
* // message has not been written to journal
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* // ...
|
||||
*
|
||||
* ActorRef processor = getContext().actorOf(Props.create(MyProcessor.class), "myProcessor");
|
||||
*
|
||||
* processor.tell(Persistent.create("foo"), null);
|
||||
* processor.tell("bar", null);
|
||||
* }}}
|
||||
*
|
||||
* During start and restart, persistent messages are replayed to a processor so that it can recover internal
|
||||
* state from these messages. New messages sent to a processor during recovery do not interfere with replayed
|
||||
* messages, hence applications don't need to wait for a processor to complete its recovery.
|
||||
*
|
||||
* Automated recovery can be turned off or customized by overriding the [[preStartProcessor]] and
|
||||
* [[preRestartProcessor]] life cycle hooks. If automated recovery is turned off, an application can
|
||||
* explicitly recover a processor by sending it a [[Recover]] message.
|
||||
*
|
||||
* [[Persistent]] messages are assigned sequence numbers that are generated on a per-processor basis. A sequence
|
||||
* starts at `1L` and doesn't contain gaps unless a processor (logically) [[delete]]s a message.
|
||||
*
|
||||
* During recovery, a processor internally buffers new messages until recovery completes, so that new messages
|
||||
* do not interfere with replayed messages. This internal buffer (the ''processor stash'') is isolated from the
|
||||
* ''user stash'' inherited by `akka.actor.Stash`. `Processor` implementation classes can therefore use the
|
||||
* ''user stash'' for stashing/unstashing both persistent and transient messages.
|
||||
*
|
||||
* @see [[Processor]]
|
||||
*/
|
||||
abstract class UntypedProcessor extends UntypedActor with Processor {
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
*
|
||||
* Returns the current persistent message or `null` if there is none.
|
||||
*/
|
||||
def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null)
|
||||
}
|
||||
|
||||
/**
|
||||
* Recovery request for a [[Processor]].
|
||||
*
|
||||
* @param toSequenceNr upper sequence number bound (inclusive) for replayed messages.
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
case class Recover(toSequenceNr: Long = Long.MaxValue)
|
||||
|
||||
object Recover {
|
||||
/**
|
||||
* Java API.
|
||||
*/
|
||||
def create() = Recover()
|
||||
|
||||
/**
|
||||
* Java API.
|
||||
*/
|
||||
def create(toSequenceNr: Long) = Recover(toSequenceNr)
|
||||
}
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.journal
|
||||
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import akka.actor._
|
||||
import akka.pattern.PromiseActorRef
|
||||
import akka.persistence._
|
||||
|
||||
/**
|
||||
* In-memory journal configuration object.
|
||||
*/
|
||||
private[persistence] class InmemJournalSettings(config: Config) extends JournalFactory {
|
||||
/**
|
||||
* Creates a new in-memory journal actor from this configuration object.
|
||||
*/
|
||||
def createJournal(implicit factory: ActorRefFactory): ActorRef = factory.actorOf(Props(classOf[InmemJournal], this))
|
||||
}
|
||||
|
||||
/**
|
||||
* In-memory journal.
|
||||
*/
|
||||
private[persistence] class InmemJournal(settings: InmemJournalSettings) extends Actor {
|
||||
// processorId => (message, sender, deleted)
|
||||
private var messages = Map.empty[String, List[(PersistentImpl, ActorRef, Boolean)]]
|
||||
// (processorId, sequenceNr) => confirming channels ids
|
||||
private var confirms = Map.empty[(String, Long), List[String]]
|
||||
|
||||
import Journal._
|
||||
|
||||
def receive = {
|
||||
case Write(pm, p) ⇒ {
|
||||
// must be done because PromiseActorRef instances have no uid set TODO: discuss
|
||||
val ps = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
|
||||
messages = messages + (messages.get(pm.processorId) match {
|
||||
case None ⇒ pm.processorId -> List((pm.copy(resolved = false), ps, false))
|
||||
case Some(mss) ⇒ pm.processorId -> ((pm.copy(resolved = false), ps, false) :: mss)
|
||||
})
|
||||
p.tell(Written(pm), sender)
|
||||
}
|
||||
case c @ Confirm(pid, snr, cid) ⇒ {
|
||||
val pair = (pid, snr)
|
||||
confirms = confirms + (confirms.get(pair) match {
|
||||
case None ⇒ pair -> List(cid)
|
||||
case Some(cids) ⇒ pair -> (cid :: cids)
|
||||
})
|
||||
// TODO: turn off by default and allow to turn on by configuration
|
||||
context.system.eventStream.publish(c)
|
||||
}
|
||||
case Delete(pm: PersistentImpl) ⇒ {
|
||||
val pid = pm.processorId
|
||||
val snr = pm.sequenceNr
|
||||
messages = messages map {
|
||||
case (`pid`, mss) ⇒ pid -> (mss map {
|
||||
case (msg, sdr, _) if msg.sequenceNr == snr ⇒ (msg, sdr, true)
|
||||
case ms ⇒ ms
|
||||
})
|
||||
case kv ⇒ kv
|
||||
}
|
||||
}
|
||||
case Loop(m, p) ⇒ {
|
||||
p.tell(Looped(m), sender)
|
||||
}
|
||||
case Replay(toSnr, p, pid) ⇒ {
|
||||
val cfs = confirms.withDefaultValue(Nil)
|
||||
for {
|
||||
mss ← messages.get(pid)
|
||||
(msg, sdr, del) ← mss.reverseIterator.filter(_._1.sequenceNr <= toSnr)
|
||||
} if (!del) p.tell(Replayed(msg.copy(confirms = cfs((msg.processorId, msg.sequenceNr)))), sdr)
|
||||
p.tell(RecoveryEnd(messages.get(pid).map(_.head._1.sequenceNr).getOrElse(0L)), self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,278 @@
|
|||
/**
|
||||
* Copyright (C) 2012-2013 Eligotech BV.
|
||||
*/
|
||||
|
||||
package akka.persistence.journal
|
||||
|
||||
import java.io.File
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
import com.typesafe.config.Config
|
||||
|
||||
import org.iq80.leveldb._
|
||||
|
||||
import akka.actor._
|
||||
import akka.pattern.PromiseActorRef
|
||||
import akka.persistence._
|
||||
import akka.serialization.{ Serialization, SerializationExtension }
|
||||
|
||||
/**
|
||||
* LevelDB journal configuration object.
|
||||
*/
|
||||
private[persistence] class LeveldbJournalSettings(config: Config) extends JournalFactory {
|
||||
/**
|
||||
* Name of directory where journal files shall be stored. Can be a relative or absolute path.
|
||||
*/
|
||||
val dir: String = config.getString("dir")
|
||||
|
||||
/**
|
||||
* Currently `false`.
|
||||
*/
|
||||
val checksum = false
|
||||
|
||||
/**
|
||||
* Currently `false`.
|
||||
*/
|
||||
val fsync = false
|
||||
|
||||
/**
|
||||
* Creates a new LevelDB journal actor from this configuration object.
|
||||
*/
|
||||
def createJournal(implicit factory: ActorRefFactory): ActorRef =
|
||||
factory.actorOf(Props(classOf[LeveldbJournal], this).withDispatcher("akka.persistence.journal.leveldb.dispatcher"))
|
||||
}
|
||||
|
||||
/**
|
||||
* LevelDB journal.
|
||||
*/
|
||||
private[persistence] class LeveldbJournal(settings: LeveldbJournalSettings) extends Actor {
|
||||
// TODO: support migration of processor and channel ids
|
||||
// needed if default processor and channel ids are used
|
||||
// (actor paths, which contain deployment information).
|
||||
|
||||
val leveldbOptions = new Options().createIfMissing(true).compressionType(CompressionType.NONE)
|
||||
val levelDbReadOptions = new ReadOptions().verifyChecksums(settings.checksum)
|
||||
val levelDbWriteOptions = new WriteOptions().sync(settings.fsync)
|
||||
|
||||
val leveldbFactory = org.iq80.leveldb.impl.Iq80DBFactory.factory
|
||||
var leveldb: DB = _
|
||||
|
||||
val numericIdOffset = 10
|
||||
var pathMap: Map[String, Int] = Map.empty
|
||||
|
||||
// TODO: use protobuf serializer for PersistentImpl
|
||||
// TODO: use user-defined serializer for payload
|
||||
val serializer = SerializationExtension(context.system).findSerializerFor("")
|
||||
|
||||
import LeveldbJournal._
|
||||
import Journal._
|
||||
|
||||
def receive = {
|
||||
case Write(pm, p) ⇒ {
|
||||
val sm = withBatch { batch ⇒
|
||||
// must be done because PromiseActorRef instances have no uid set TODO: discuss
|
||||
val ps = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
|
||||
val sm = pm.copy(sender = Serialization.serializedActorPath(ps))
|
||||
val pid = numericId(sm.processorId)
|
||||
batch.put(keyToBytes(counterKey(pid)), counterToBytes(sm.sequenceNr))
|
||||
batch.put(keyToBytes(Key(pid, sm.sequenceNr, 0)), msgToBytes(sm.copy(resolved = false, confirmTarget = null, confirmMessage = null)))
|
||||
sm
|
||||
}
|
||||
p.tell(Written(sm), sender)
|
||||
}
|
||||
case c @ Confirm(pid, snr, cid) ⇒ {
|
||||
leveldb.put(keyToBytes(Key(numericId(pid), snr, numericId(cid))), cid.getBytes("UTF-8"))
|
||||
// TODO: turn off by default and allow to turn on by configuration
|
||||
context.system.eventStream.publish(c)
|
||||
}
|
||||
case Delete(pm: PersistentImpl) ⇒ {
|
||||
leveldb.put(keyToBytes(deletionKey(numericId(pm.processorId), pm.sequenceNr)), Array.empty[Byte])
|
||||
}
|
||||
case Loop(m, p) ⇒ {
|
||||
p.tell(Looped(m), sender)
|
||||
}
|
||||
case Replay(toSnr, p, pid) ⇒ {
|
||||
val options = levelDbReadOptions.snapshot(leveldb.getSnapshot)
|
||||
val iter = leveldb.iterator(options)
|
||||
val maxSnr = leveldb.get(keyToBytes(counterKey(numericId(pid))), options) match {
|
||||
case null ⇒ 0L
|
||||
case bytes ⇒ counterFromBytes(bytes)
|
||||
}
|
||||
context.actorOf(Props(classOf[LeveldbReplay], msgFromBytes _)) ! LeveldbReplay.Replay(toSnr, maxSnr, p, numericId(pid), iter)
|
||||
}
|
||||
}
|
||||
|
||||
private def msgToBytes(m: PersistentImpl): Array[Byte] = serializer.toBinary(m)
|
||||
private def msgFromBytes(a: Array[Byte]): PersistentImpl = serializer.fromBinary(a).asInstanceOf[PersistentImpl]
|
||||
|
||||
// ----------------------------------------------------------
|
||||
// Path mapping
|
||||
// ----------------------------------------------------------
|
||||
|
||||
private def numericId(processorId: String): Int = pathMap.get(processorId) match {
|
||||
case None ⇒ writePathMapping(processorId, pathMap.size + numericIdOffset)
|
||||
case Some(v) ⇒ v
|
||||
}
|
||||
|
||||
private def readPathMap(): Map[String, Int] = {
|
||||
val iter = leveldb.iterator(levelDbReadOptions.snapshot(leveldb.getSnapshot))
|
||||
try {
|
||||
iter.seek(keyToBytes(idToKey(numericIdOffset)))
|
||||
readPathMap(Map.empty, iter)
|
||||
} finally {
|
||||
iter.close()
|
||||
}
|
||||
}
|
||||
|
||||
private def readPathMap(pathMap: Map[String, Int], iter: DBIterator): Map[String, Int] = {
|
||||
if (!iter.hasNext) pathMap else {
|
||||
val nextEntry = iter.next()
|
||||
val nextKey = keyFromBytes(nextEntry.getKey)
|
||||
if (!isMappingKey(nextKey)) pathMap else {
|
||||
val nextVal = new String(nextEntry.getValue, "UTF-8")
|
||||
readPathMap(pathMap + (nextVal -> idFromKey(nextKey)), iter)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def writePathMapping(path: String, numericId: Int): Int = {
|
||||
pathMap = pathMap + (path -> numericId)
|
||||
leveldb.put(keyToBytes(idToKey(numericId)), path.getBytes("UTF-8"))
|
||||
numericId
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------
|
||||
// Batch write support
|
||||
// ----------------------------------------------------------
|
||||
|
||||
def withBatch[R](body: WriteBatch ⇒ R): R = {
|
||||
val batch = leveldb.createWriteBatch()
|
||||
try {
|
||||
val r = body(batch)
|
||||
leveldb.write(batch, levelDbWriteOptions)
|
||||
r
|
||||
} finally {
|
||||
batch.close()
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------
|
||||
// Life cycle
|
||||
// ----------------------------------------------------------
|
||||
|
||||
override def preStart() {
|
||||
leveldb = leveldbFactory.open(new File(settings.dir), leveldbOptions)
|
||||
pathMap = readPathMap()
|
||||
}
|
||||
|
||||
override def postStop() {
|
||||
leveldb.close()
|
||||
}
|
||||
}
|
||||
|
||||
private object LeveldbJournal {
|
||||
case class Key(
|
||||
processorId: Int,
|
||||
sequenceNr: Long,
|
||||
channelId: Int)
|
||||
|
||||
def idToKey(id: Int) = Key(1, 0L, id)
|
||||
def idFromKey(key: Key) = key.channelId
|
||||
|
||||
def counterKey(processorId: Int): Key = Key(processorId, 0L, 0)
|
||||
def isMappingKey(key: Key): Boolean = key.processorId == 1
|
||||
|
||||
def deletionKey(processorId: Int, sequenceNr: Long): Key = Key(processorId, sequenceNr, 1)
|
||||
def isDeletionKey(key: Key): Boolean = key.channelId == 1
|
||||
|
||||
def counterToBytes(ctr: Long): Array[Byte] = ByteBuffer.allocate(8).putLong(ctr).array
|
||||
def counterFromBytes(bytes: Array[Byte]): Long = ByteBuffer.wrap(bytes).getLong
|
||||
|
||||
def keyToBytes(key: Key): Array[Byte] = {
|
||||
val bb = ByteBuffer.allocate(20)
|
||||
bb.putInt(key.processorId)
|
||||
bb.putLong(key.sequenceNr)
|
||||
bb.putInt(key.channelId)
|
||||
bb.array
|
||||
}
|
||||
|
||||
def keyFromBytes(bytes: Array[Byte]): Key = {
|
||||
val bb = ByteBuffer.wrap(bytes)
|
||||
val aid = bb.getInt
|
||||
val snr = bb.getLong
|
||||
val cid = bb.getInt
|
||||
new Key(aid, snr, cid)
|
||||
}
|
||||
}
|
||||
|
||||
private class LeveldbReplay(deserialize: Array[Byte] ⇒ PersistentImpl) extends Actor {
|
||||
val extension = Persistence(context.system)
|
||||
|
||||
import LeveldbReplay._
|
||||
import LeveldbJournal._
|
||||
import Journal.{ Replayed, RecoveryEnd }
|
||||
|
||||
// TODO: parent should stop replay actor if it crashes
|
||||
// TODO: use a pinned dispatcher
|
||||
|
||||
def receive = {
|
||||
case Replay(toSnr, maxSnr, processor, processorId, iter) ⇒ {
|
||||
try {
|
||||
val startKey = Key(processorId, 1L, 0)
|
||||
iter.seek(keyToBytes(startKey))
|
||||
replay(iter, startKey, toSnr, m ⇒ processor.tell(Replayed(m), extension.system.provider.resolveActorRef(m.sender)))
|
||||
} finally { iter.close() }
|
||||
processor.tell(RecoveryEnd(maxSnr), self)
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
@scala.annotation.tailrec
|
||||
private def replay(iter: DBIterator, key: Key, toSnr: Long, callback: PersistentImpl ⇒ Unit) {
|
||||
if (iter.hasNext) {
|
||||
val nextEntry = iter.next()
|
||||
val nextKey = keyFromBytes(nextEntry.getKey)
|
||||
if (nextKey.sequenceNr > toSnr) {
|
||||
// end iteration here
|
||||
} else if (nextKey.channelId != 0) {
|
||||
// phantom confirmation (just advance iterator)
|
||||
replay(iter, nextKey, toSnr, callback)
|
||||
} else if (key.processorId == nextKey.processorId) {
|
||||
val msg = deserialize(nextEntry.getValue)
|
||||
val del = deletion(iter, nextKey)
|
||||
val cnf = confirms(iter, nextKey, Nil)
|
||||
if (!del) callback(msg.copy(confirms = cnf))
|
||||
replay(iter, nextKey, toSnr, callback)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def deletion(iter: DBIterator, key: Key): Boolean = {
|
||||
if (iter.hasNext) {
|
||||
val nextEntry = iter.peekNext()
|
||||
val nextKey = keyFromBytes(nextEntry.getKey)
|
||||
if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr && isDeletionKey(nextKey)) {
|
||||
iter.next()
|
||||
true
|
||||
} else false
|
||||
} else false
|
||||
}
|
||||
|
||||
@scala.annotation.tailrec
|
||||
private def confirms(iter: DBIterator, key: Key, channelIds: List[String]): List[String] = {
|
||||
if (iter.hasNext) {
|
||||
val nextEntry = iter.peekNext()
|
||||
val nextKey = keyFromBytes(nextEntry.getKey)
|
||||
if (key.processorId == nextKey.processorId && key.sequenceNr == nextKey.sequenceNr) {
|
||||
val nextValue = new String(nextEntry.getValue, "UTF-8")
|
||||
iter.next()
|
||||
confirms(iter, nextKey, nextValue :: channelIds)
|
||||
} else channelIds
|
||||
} else channelIds
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private object LeveldbReplay {
|
||||
case class Replay(toSequenceNr: Long, maxSequenceNr: Long, processor: ActorRef, pid: Int, iterator: DBIterator)
|
||||
}
|
||||
|
|
@ -0,0 +1,129 @@
|
|||
package akka.persistence
|
||||
|
||||
import akka.actor._
|
||||
import akka.testkit._
|
||||
|
||||
object ChannelSpec {
|
||||
val config =
|
||||
"""
|
||||
|serialize-creators = on
|
||||
|serialize-messages = on
|
||||
|akka.persistence.journal.leveldb.dir = "target/journal-channel-spec"
|
||||
""".stripMargin
|
||||
|
||||
class TestProcessor extends Processor {
|
||||
val destination = context.actorOf(Props[TestDestination])
|
||||
val channel = context.actorOf(Channel.props(), "channel")
|
||||
|
||||
def receive = {
|
||||
case m @ Persistent(s: String, _) if s.startsWith("a") ⇒ {
|
||||
// forward to destination via channel,
|
||||
// destination replies to initial sender
|
||||
channel forward Deliver(m.withPayload(s"fw: ${s}"), destination)
|
||||
}
|
||||
case m @ Persistent(s: String, _) if s.startsWith("b") ⇒ {
|
||||
// reply to sender via channel
|
||||
channel ! Deliver(m.withPayload(s"re: ${s}"), sender)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class TestDestination extends Actor {
|
||||
def receive = {
|
||||
case m: Persistent ⇒ sender ! m
|
||||
}
|
||||
}
|
||||
|
||||
class TestReceiver(testActor: ActorRef) extends Actor {
|
||||
def receive = {
|
||||
case Persistent(payload, _) ⇒ testActor ! payload
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ChannelSpec extends AkkaSpec(ChannelSpec.config) with PersistenceSpec with ImplicitSender {
|
||||
import ChannelSpec._
|
||||
|
||||
override protected def beforeEach() {
|
||||
super.beforeEach()
|
||||
|
||||
val confirmProbe = TestProbe()
|
||||
val forwardProbe = TestProbe()
|
||||
val replyProbe = TestProbe()
|
||||
|
||||
val processor = system.actorOf(Props[TestProcessor], name)
|
||||
|
||||
system.eventStream.subscribe(confirmProbe.ref, classOf[Confirm])
|
||||
|
||||
processor tell (Persistent("a1"), forwardProbe.ref)
|
||||
processor tell (Persistent("b1"), replyProbe.ref)
|
||||
|
||||
forwardProbe.expectMsgPF() { case m @ Persistent("fw: a1", _) ⇒ m.confirm() }
|
||||
replyProbe.expectMsgPF() { case m @ Persistent("re: b1", _) ⇒ m.confirm() }
|
||||
|
||||
// wait for confirmations to be stored by journal (needed
|
||||
// for replay so that channels can drop confirmed messages)
|
||||
confirmProbe.expectMsgType[Confirm]
|
||||
confirmProbe.expectMsgType[Confirm]
|
||||
|
||||
stopAndAwaitTermination(processor)
|
||||
}
|
||||
|
||||
"A channel" must {
|
||||
"forward un-confirmed messages to destination" in {
|
||||
val processor = system.actorOf(Props[TestProcessor], name)
|
||||
processor ! Persistent("a2")
|
||||
expectMsgPF() { case m @ Persistent("fw: a2", _) ⇒ m.confirm() }
|
||||
}
|
||||
"reply un-confirmed messages to senders" in {
|
||||
val processor = system.actorOf(Props[TestProcessor], name)
|
||||
processor ! Persistent("b2")
|
||||
expectMsgPF() { case m @ Persistent("re: b2", _) ⇒ m.confirm() }
|
||||
}
|
||||
"must resolve sender references and preserve message order" in {
|
||||
val channel = system.actorOf(Channel.props(), "testChannel1")
|
||||
val destination = system.actorOf(Props[TestDestination])
|
||||
val sender1 = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender")
|
||||
|
||||
channel tell (Deliver(Persistent("a"), destination), sender1)
|
||||
expectMsg("a")
|
||||
stopAndAwaitTermination(sender1)
|
||||
|
||||
// create new incarnation of sender (with same actor path)
|
||||
val sender2 = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender")
|
||||
|
||||
// replayed message (resolved = false) and invalid sender reference
|
||||
channel tell (Deliver(PersistentImpl("a", resolved = false), destination, Resolve.Sender), sender1)
|
||||
|
||||
// new messages (resolved = true) and valid sender references
|
||||
channel tell (Deliver(Persistent("b"), destination), sender2)
|
||||
channel tell (Deliver(Persistent("c"), destination), sender2)
|
||||
|
||||
expectMsg("a")
|
||||
expectMsg("b")
|
||||
expectMsg("c")
|
||||
}
|
||||
"must resolve destination references and preserve message order" in {
|
||||
val channel = system.actorOf(Channel.props(), "testChannel2")
|
||||
val destination1 = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination")
|
||||
|
||||
channel ! Deliver(Persistent("a"), destination1)
|
||||
expectMsg("a")
|
||||
stopAndAwaitTermination(destination1)
|
||||
|
||||
// create new incarnation of destination (with same actor path)
|
||||
val destination2 = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination")
|
||||
|
||||
// replayed message (resolved = false) and invalid destination reference
|
||||
channel ! Deliver(PersistentImpl("a", resolved = false), destination1, Resolve.Destination)
|
||||
|
||||
// new messages (resolved = true) and valid destination references
|
||||
channel ! Deliver(Persistent("b"), destination2)
|
||||
channel ! Deliver(Persistent("c"), destination2)
|
||||
|
||||
expectMsg("a")
|
||||
expectMsg("b")
|
||||
expectMsg("c")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,46 @@
|
|||
package akka.persistence
|
||||
|
||||
import java.io.File
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.TestActor.Watch
|
||||
|
||||
trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒
|
||||
private var _name: String = _
|
||||
|
||||
val extension = Persistence(system)
|
||||
val counter = new AtomicInteger(0)
|
||||
|
||||
/**
|
||||
* Unique name per test.
|
||||
*/
|
||||
def name = _name
|
||||
|
||||
/**
|
||||
* Prefix for generating a unique name per test.
|
||||
*/
|
||||
def namePrefix: String = "processor"
|
||||
|
||||
protected def stopAndAwaitTermination(ref: ActorRef) {
|
||||
testActor ! Watch(ref)
|
||||
system.stop(ref)
|
||||
expectTerminated(ref)
|
||||
}
|
||||
|
||||
override protected def beforeEach() {
|
||||
_name = namePrefix + counter.incrementAndGet()
|
||||
}
|
||||
|
||||
override protected def afterTermination() {
|
||||
FileUtils.deleteDirectory(new File(system.settings.config.getString("akka.persistence.journal.leveldb.dir")))
|
||||
}
|
||||
}
|
||||
|
||||
trait TurnOffRecoverOnStart { this: Processor ⇒
|
||||
override def preStartProcessor(): Unit = ()
|
||||
}
|
||||
|
|
@ -0,0 +1,303 @@
|
|||
package akka.persistence
|
||||
|
||||
import akka.actor._
|
||||
import akka.testkit._
|
||||
|
||||
object ProcessorSpec {
|
||||
val config =
|
||||
"""
|
||||
|serialize-creators = on
|
||||
|serialize-messages = on
|
||||
|akka.persistence.journal.leveldb.dir = "target/journal-processor-spec"
|
||||
""".stripMargin
|
||||
|
||||
case object GetState
|
||||
|
||||
class RecoverTestProcessor extends Processor {
|
||||
var state = List.empty[String]
|
||||
def receive = {
|
||||
case "boom" ⇒ throw new Exception("boom")
|
||||
case Persistent("boom", _) ⇒ throw new Exception("boom")
|
||||
case Persistent(payload, snr) ⇒ state = s"${payload}-${snr}" :: state
|
||||
case GetState ⇒ sender ! state.reverse
|
||||
}
|
||||
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) = {
|
||||
message match {
|
||||
case Some(m: Persistent) ⇒ delete(m) // delete message from journal
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
super.preRestartProcessor(reason, message)
|
||||
}
|
||||
}
|
||||
|
||||
class RecoverOffTestProcessor extends RecoverTestProcessor with TurnOffRecoverOnStart
|
||||
|
||||
class StoredSenderTestProcessor extends Processor {
|
||||
def receive = {
|
||||
case Persistent(payload, _) ⇒ sender ! payload
|
||||
}
|
||||
}
|
||||
|
||||
class RecoveryStatusTestProcessor extends Processor {
|
||||
def receive = {
|
||||
case Persistent("c", _) if !recoveryRunning ⇒ sender ! "c"
|
||||
case Persistent(payload, _) if recoveryRunning ⇒ sender ! payload
|
||||
}
|
||||
}
|
||||
|
||||
class BehaviorChangeTestProcessor extends Processor {
|
||||
val acceptA: Actor.Receive = {
|
||||
case Persistent("a", _) ⇒ {
|
||||
sender ! "a"
|
||||
context.become(acceptB)
|
||||
}
|
||||
}
|
||||
|
||||
val acceptB: Actor.Receive = {
|
||||
case Persistent("b", _) ⇒ {
|
||||
sender ! "b"
|
||||
context.become(acceptA)
|
||||
}
|
||||
}
|
||||
|
||||
def receive = acceptA
|
||||
}
|
||||
|
||||
class FsmTestProcessor extends Processor with FSM[String, Int] {
|
||||
startWith("closed", 0)
|
||||
|
||||
when("closed") {
|
||||
case Event(Persistent("a", _), counter) ⇒ {
|
||||
goto("open") using (counter + 1) replying (counter)
|
||||
}
|
||||
}
|
||||
|
||||
when("open") {
|
||||
case Event(Persistent("b", _), counter) ⇒ {
|
||||
goto("closed") using (counter + 1) replying (counter)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class OutboundMessageTestProcessor extends Processor {
|
||||
def receive = {
|
||||
case Persistent(payload, snr) ⇒ sender ! Persistent(snr)
|
||||
}
|
||||
}
|
||||
|
||||
class ResumeTestException extends Exception("test")
|
||||
|
||||
class ResumeTestSupervisor extends Actor {
|
||||
val processor = context.actorOf(Props[ResumeTestProcessor], "processor")
|
||||
|
||||
override val supervisorStrategy =
|
||||
OneForOneStrategy() {
|
||||
case _: ResumeTestException ⇒ SupervisorStrategy.Resume
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case m ⇒ processor forward m
|
||||
}
|
||||
}
|
||||
|
||||
class ResumeTestProcessor extends Processor {
|
||||
var state: List[String] = Nil
|
||||
def receive = {
|
||||
case "boom" ⇒ throw new ResumeTestException
|
||||
case Persistent(payload, snr) ⇒ state = s"${payload}-${snr}" :: state
|
||||
case GetState ⇒ sender ! state.reverse
|
||||
}
|
||||
}
|
||||
|
||||
class LastReplayedMsgFailsTestProcessor extends RecoverTestProcessor {
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) = {
|
||||
message match {
|
||||
case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m)
|
||||
case _ ⇒
|
||||
}
|
||||
super.preRestartProcessor(reason, message)
|
||||
}
|
||||
}
|
||||
|
||||
class AnyReplayedMsgFailsTestProcessor extends RecoverTestProcessor {
|
||||
val failOnReplayedA: Actor.Receive = {
|
||||
case Persistent("a", _) if recoveryRunning ⇒ throw new Exception("boom")
|
||||
}
|
||||
|
||||
override def receive = failOnReplayedA orElse super.receive
|
||||
}
|
||||
}
|
||||
|
||||
class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec with ImplicitSender {
|
||||
import ProcessorSpec._
|
||||
|
||||
override protected def beforeEach() {
|
||||
super.beforeEach()
|
||||
|
||||
val processor = system.actorOf(Props[RecoverTestProcessor], name)
|
||||
processor ! Persistent("a")
|
||||
processor ! Persistent("b")
|
||||
processor ! GetState
|
||||
expectMsg(List("a-1", "b-2"))
|
||||
stopAndAwaitTermination(processor)
|
||||
}
|
||||
|
||||
"A processor" must {
|
||||
"recover state on explicit request" in {
|
||||
val processor = system.actorOf(Props[RecoverOffTestProcessor], name)
|
||||
processor ! Recover()
|
||||
processor ! GetState
|
||||
expectMsg(List("a-1", "b-2"))
|
||||
}
|
||||
"recover state automatically" in {
|
||||
val processor = system.actorOf(Props[RecoverTestProcessor], name)
|
||||
processor ! GetState
|
||||
expectMsg(List("a-1", "b-2"))
|
||||
}
|
||||
"recover state automatically on restart" in {
|
||||
val processor = system.actorOf(Props[RecoverTestProcessor], name)
|
||||
processor ! "boom"
|
||||
processor ! GetState
|
||||
expectMsg(List("a-1", "b-2"))
|
||||
}
|
||||
"buffer new messages until recovery completed" in {
|
||||
val processor = system.actorOf(Props[RecoverOffTestProcessor], name)
|
||||
processor ! Persistent("c")
|
||||
processor ! Recover()
|
||||
processor ! Persistent("d")
|
||||
processor ! GetState
|
||||
expectMsg(List("a-1", "b-2", "c-3", "d-4"))
|
||||
}
|
||||
"ignore redundant recovery requests" in {
|
||||
val processor = system.actorOf(Props[RecoverOffTestProcessor], name)
|
||||
processor ! Persistent("c")
|
||||
processor ! Recover()
|
||||
processor ! Persistent("d")
|
||||
processor ! Recover()
|
||||
processor ! Persistent("e")
|
||||
processor ! GetState
|
||||
expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5"))
|
||||
}
|
||||
"buffer new messages until restart-recovery completed" in {
|
||||
val processor = system.actorOf(Props[RecoverTestProcessor], name)
|
||||
processor ! "boom"
|
||||
processor ! Persistent("c")
|
||||
processor ! Persistent("d")
|
||||
processor ! GetState
|
||||
expectMsg(List("a-1", "b-2", "c-3", "d-4"))
|
||||
}
|
||||
"allow deletion of journaled messages on failure" in {
|
||||
val processor = system.actorOf(Props[RecoverTestProcessor], name)
|
||||
processor ! Persistent("boom") // journaled message causes failure and will be deleted
|
||||
processor ! GetState
|
||||
expectMsg(List("a-1", "b-2"))
|
||||
}
|
||||
"allow deletion of journaled messages on failure and buffer new messages until restart-recovery completed" in {
|
||||
val processor = system.actorOf(Props[RecoverTestProcessor], name)
|
||||
processor ! Persistent("boom") // journaled message causes failure and will be deleted
|
||||
processor ! Persistent("c")
|
||||
processor ! Persistent("d")
|
||||
processor ! GetState
|
||||
expectMsg(List("a-1", "b-2", "c-4", "d-5")) // deleted message leaves gap in sequence
|
||||
}
|
||||
"store sender references and restore them for replayed messages" in {
|
||||
system.actorOf(Props[StoredSenderTestProcessor], name)
|
||||
List("a", "b") foreach (expectMsg(_))
|
||||
}
|
||||
"properly indicate its recovery status" in {
|
||||
val processor = system.actorOf(Props[RecoveryStatusTestProcessor], name)
|
||||
processor ! Persistent("c")
|
||||
List("a", "b", "c") foreach (expectMsg(_))
|
||||
}
|
||||
"continue journaling when changing behavior" in {
|
||||
val processor = system.actorOf(Props[BehaviorChangeTestProcessor], name)
|
||||
processor ! Persistent("a")
|
||||
processor ! Persistent("b")
|
||||
List("a", "b", "a", "b") foreach (expectMsg(_))
|
||||
}
|
||||
"derive outbound messages from the current message" in {
|
||||
val processor = system.actorOf(Props[OutboundMessageTestProcessor], name)
|
||||
processor ! Persistent("c")
|
||||
1 to 3 foreach { _ ⇒ expectMsgPF() { case Persistent(payload, snr) ⇒ payload must be(snr) } }
|
||||
}
|
||||
"support recovery with upper sequence number bound" in {
|
||||
val processor = system.actorOf(Props[RecoverOffTestProcessor], name)
|
||||
processor ! Recover(1L)
|
||||
processor ! GetState
|
||||
expectMsg(List("a-1"))
|
||||
}
|
||||
"never replace journaled messages" in {
|
||||
val processor1 = system.actorOf(Props[RecoverOffTestProcessor], name)
|
||||
processor1 ! Recover(1L)
|
||||
processor1 ! Persistent("c")
|
||||
processor1 ! GetState
|
||||
expectMsg(List("a-1", "c-3"))
|
||||
stopAndAwaitTermination(processor1)
|
||||
|
||||
val processor2 = system.actorOf(Props[RecoverOffTestProcessor], name)
|
||||
processor2 ! Recover()
|
||||
processor2 ! GetState
|
||||
expectMsg(List("a-1", "b-2", "c-3"))
|
||||
}
|
||||
"be able to skip restart recovery when being resumed" in {
|
||||
val supervisor1 = system.actorOf(Props[ResumeTestSupervisor], name)
|
||||
supervisor1 ! Persistent("a")
|
||||
supervisor1 ! Persistent("b")
|
||||
supervisor1 ! GetState
|
||||
expectMsg(List("a-1", "b-2"))
|
||||
stopAndAwaitTermination(supervisor1)
|
||||
|
||||
val supervisor2 = system.actorOf(Props[ResumeTestSupervisor], name)
|
||||
supervisor2 ! Persistent("c")
|
||||
supervisor2 ! "boom"
|
||||
supervisor2 ! Persistent("d")
|
||||
supervisor2 ! GetState
|
||||
expectMsg(List("a-1", "b-2", "c-3", "d-4"))
|
||||
stopAndAwaitTermination(supervisor2)
|
||||
|
||||
val supervisor3 = system.actorOf(Props[ResumeTestSupervisor], name)
|
||||
supervisor3 ! GetState
|
||||
expectMsg(List("a-1", "b-2", "c-3", "d-4"))
|
||||
}
|
||||
"be able to re-run restart recovery when it fails with last replayed message" in {
|
||||
val processor = system.actorOf(Props[LastReplayedMsgFailsTestProcessor], name)
|
||||
processor ! Persistent("c")
|
||||
processor ! Persistent("boom")
|
||||
processor ! Persistent("d")
|
||||
processor ! GetState
|
||||
expectMsg(List("a-1", "b-2", "c-3", "d-5"))
|
||||
}
|
||||
"be able to re-run initial recovery when it fails with a message that is not the last replayed message" in {
|
||||
val processor = system.actorOf(Props[AnyReplayedMsgFailsTestProcessor], name)
|
||||
processor ! Persistent("c")
|
||||
processor ! GetState
|
||||
expectMsg(List("b-2", "c-3"))
|
||||
}
|
||||
"be able to re-run restart recovery when it fails with a message that is not the last replayed message" in {
|
||||
val processor = system.actorOf(Props[AnyReplayedMsgFailsTestProcessor], "other") // new processor, no initial replay
|
||||
processor ! Persistent("b")
|
||||
processor ! Persistent("a")
|
||||
processor ! Persistent("c")
|
||||
processor ! Persistent("d")
|
||||
processor ! Persistent("e")
|
||||
processor ! Persistent("f")
|
||||
processor ! Persistent("g")
|
||||
processor ! Persistent("h")
|
||||
processor ! Persistent("i")
|
||||
processor ! "boom"
|
||||
processor ! Persistent("j")
|
||||
processor ! GetState
|
||||
expectMsg(List("b-1", "c-3", "d-4", "e-5", "f-6", "g-7", "h-8", "i-9", "j-10"))
|
||||
}
|
||||
}
|
||||
|
||||
"A processor" can {
|
||||
"be a finite state machine" in {
|
||||
val processor = system.actorOf(Props[FsmTestProcessor], name)
|
||||
processor ! Persistent("a")
|
||||
processor ! Persistent("b")
|
||||
List(0, 1, 2, 3) foreach (expectMsg(_))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,127 @@
|
|||
package akka.persistence
|
||||
|
||||
import akka.actor._
|
||||
import akka.testkit._
|
||||
|
||||
object ProcessorStashSpec {
|
||||
val config =
|
||||
"""
|
||||
|serialize-creators = on
|
||||
|serialize-messages = on
|
||||
|akka.persistence.journal.leveldb.dir = "target/journal-processor-stash-spec"
|
||||
""".stripMargin
|
||||
|
||||
case object GetState
|
||||
|
||||
class StashingProcessor extends Processor {
|
||||
var state: List[String] = Nil
|
||||
|
||||
val behaviorA: Actor.Receive = {
|
||||
case Persistent("a", snr) ⇒ update("a", snr); context.become(behaviorB)
|
||||
case Persistent("b", snr) ⇒ update("b", snr)
|
||||
case Persistent("c", snr) ⇒ update("c", snr); unstashAll()
|
||||
case "x" ⇒ update("x")
|
||||
case "boom" ⇒ throw new Exception("boom")
|
||||
case Persistent("boom", _) ⇒ throw new Exception("boom")
|
||||
case GetState ⇒ sender ! state.reverse
|
||||
}
|
||||
|
||||
val behaviorB: Actor.Receive = {
|
||||
case Persistent("b", _) ⇒ stash(); context.become(behaviorA)
|
||||
case "x" ⇒ stash()
|
||||
}
|
||||
|
||||
def receive = behaviorA
|
||||
|
||||
def update(payload: String, snr: Long = 0L) {
|
||||
state = s"${payload}-${snr}" :: state
|
||||
}
|
||||
}
|
||||
|
||||
class RecoveryFailureStashingProcessor extends StashingProcessor {
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) = {
|
||||
message match {
|
||||
case Some(m: Persistent) ⇒ if (recoveryRunning) delete(m)
|
||||
case _ ⇒
|
||||
}
|
||||
super.preRestartProcessor(reason, message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with PersistenceSpec with ImplicitSender {
|
||||
import ProcessorStashSpec._
|
||||
|
||||
"A processor" must {
|
||||
"support user stash and unstash operations for persistent messages" in {
|
||||
val p1 = system.actorOf(Props[StashingProcessor], name)
|
||||
p1 ! Persistent("a")
|
||||
p1 ! Persistent("b")
|
||||
p1 ! Persistent("c")
|
||||
p1 ! GetState
|
||||
expectMsg(List("a-1", "c-3", "b-2"))
|
||||
stopAndAwaitTermination(p1)
|
||||
|
||||
val p2 = system.actorOf(Props[StashingProcessor], name)
|
||||
p2 ! Persistent("a")
|
||||
p2 ! Persistent("b")
|
||||
p2 ! Persistent("c")
|
||||
p2 ! GetState
|
||||
expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5"))
|
||||
}
|
||||
"support user stash and unstash operations for persistent and transient messages" in {
|
||||
val p1 = system.actorOf(Props[StashingProcessor], name)
|
||||
p1 ! Persistent("a")
|
||||
p1 ! "x"
|
||||
p1 ! Persistent("b")
|
||||
p1 ! Persistent("c")
|
||||
p1 ! GetState
|
||||
expectMsg(List("a-1", "c-3", "x-0", "b-2"))
|
||||
stopAndAwaitTermination(p1)
|
||||
|
||||
val p2 = system.actorOf(Props[StashingProcessor], name)
|
||||
p2 ! Persistent("a")
|
||||
p2 ! "x"
|
||||
p2 ! Persistent("b")
|
||||
p2 ! Persistent("c")
|
||||
p2 ! GetState
|
||||
expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "x-0", "b-5"))
|
||||
}
|
||||
"support restarts between user stash and unstash operations" in {
|
||||
val p1 = system.actorOf(Props[StashingProcessor], name)
|
||||
p1 ! Persistent("a")
|
||||
p1 ! Persistent("b")
|
||||
p1 ! "boom"
|
||||
p1 ! Persistent("c")
|
||||
p1 ! GetState
|
||||
expectMsg(List("a-1", "c-3", "b-2"))
|
||||
stopAndAwaitTermination(p1)
|
||||
|
||||
val p2 = system.actorOf(Props[StashingProcessor], name)
|
||||
p2 ! Persistent("a")
|
||||
p2 ! Persistent("b")
|
||||
p2 ! "boom"
|
||||
p2 ! Persistent("c")
|
||||
p2 ! GetState
|
||||
expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5"))
|
||||
}
|
||||
"support multiple restarts between user stash and unstash operations" in {
|
||||
val p1 = system.actorOf(Props[RecoveryFailureStashingProcessor], name)
|
||||
p1 ! Persistent("a")
|
||||
p1 ! Persistent("b")
|
||||
p1 ! Persistent("boom")
|
||||
p1 ! Persistent("c")
|
||||
p1 ! GetState
|
||||
expectMsg(List("a-1", "c-4", "b-2"))
|
||||
stopAndAwaitTermination(p1)
|
||||
|
||||
val p2 = system.actorOf(Props[RecoveryFailureStashingProcessor], name)
|
||||
p2 ! Persistent("a")
|
||||
p2 ! Persistent("b")
|
||||
p2 ! Persistent("boom")
|
||||
p2 ! Persistent("c")
|
||||
p2 ! GetState
|
||||
expectMsg(List("a-1", "c-4", "b-2", "a-5", "c-8", "b-6"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package sample.persistence.japi;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.persistence.*;
|
||||
|
||||
public class ProcessorChannelExample {
|
||||
public static class ExampleProcessor extends UntypedProcessor {
|
||||
private ActorRef destination;
|
||||
private ActorRef channel;
|
||||
|
||||
public ExampleProcessor(ActorRef destination) {
|
||||
this.destination = destination;
|
||||
this.channel = getContext().actorOf(Channel.props(), "channel");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof Persistent) {
|
||||
Persistent msg = (Persistent)message;
|
||||
System.out.println("processed " + msg.payload());
|
||||
channel.tell(Deliver.create(msg.withPayload("processed " + msg.payload()), destination), getSelf());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class ExampleDestination extends UntypedActor {
|
||||
@Override
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof Persistent) {
|
||||
Persistent msg = (Persistent)message;
|
||||
msg.confirm();
|
||||
System.out.println("received " + msg.payload());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String... args) throws Exception {
|
||||
final ActorSystem system = ActorSystem.create("example");
|
||||
final ActorRef destination = system.actorOf(Props.create(ExampleDestination.class));
|
||||
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class, destination), "processor");
|
||||
|
||||
processor.tell(Persistent.create("a"), null);
|
||||
processor.tell(Persistent.create("b"), null);
|
||||
|
||||
Thread.sleep(1000);
|
||||
system.shutdown();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,75 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package sample.persistence.japi;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import scala.Option;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.persistence.*;
|
||||
|
||||
public class ProcessorFailureExample {
|
||||
public static class ExampleProcessor extends UntypedProcessor {
|
||||
private ArrayList<Object> received = new ArrayList<Object>();
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof Persistent) {
|
||||
Persistent persistent = (Persistent)message;
|
||||
if (persistent.payload() == "boom") {
|
||||
throw new Exception("boom");
|
||||
} else {
|
||||
received.add(persistent.payload());
|
||||
}
|
||||
} else if (message == "boom") {
|
||||
throw new Exception("boom");
|
||||
} else if (message == "print") {
|
||||
System.out.println("received " + received);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preRestartProcessor(Throwable reason, Option<Object> message) throws Exception {
|
||||
if (message.isDefined() && message.get() instanceof Persistent) {
|
||||
delete((Persistent) message.get());
|
||||
}
|
||||
super.preRestartProcessor(reason, message);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String... args) throws Exception {
|
||||
final ActorSystem system = ActorSystem.create("example");
|
||||
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class), "processor");
|
||||
|
||||
processor.tell(Persistent.create("a"), null);
|
||||
processor.tell("print", null);
|
||||
processor.tell("boom", null);
|
||||
processor.tell("print", null);
|
||||
processor.tell(Persistent.create("b"), null);
|
||||
processor.tell("print", null);
|
||||
processor.tell(Persistent.create("boom"), null);
|
||||
processor.tell("print", null);
|
||||
processor.tell(Persistent.create("c"), null);
|
||||
processor.tell("print", null);
|
||||
|
||||
// Will print in a first run (i.e. with empty journal):
|
||||
|
||||
// received [a]
|
||||
// received [a, b]
|
||||
// received [a, b, c]
|
||||
|
||||
// Will print in a second run:
|
||||
|
||||
// received [a, b, c, a]
|
||||
// received [a, b, c, a, b]
|
||||
// received [a, b, c, a, b, c]
|
||||
|
||||
// etc ...
|
||||
|
||||
Thread.sleep(1000);
|
||||
system.shutdown();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
akka.persistence.journal.leveldb.dir = "target/journal-example"
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package sample.persistence
|
||||
|
||||
import akka.actor._
|
||||
import akka.persistence._
|
||||
|
||||
object ConversationRecoveryExample extends App {
|
||||
case object Ping
|
||||
case object Pong
|
||||
|
||||
class Ping extends Processor {
|
||||
val pongChannel = context.actorOf(Channel.props, "pongChannel")
|
||||
var counter = 0
|
||||
|
||||
def receive = {
|
||||
case m @ Persistent(Ping, _) ⇒ {
|
||||
counter += 1
|
||||
println(s"received ping ${counter} times ...")
|
||||
m.confirm()
|
||||
if (!recoveryRunning) Thread.sleep(2000)
|
||||
pongChannel ! Deliver(m.withPayload(Pong), sender, Resolve.Destination)
|
||||
}
|
||||
case "init" ⇒ if (counter == 0) self forward Persistent(Ping)
|
||||
}
|
||||
|
||||
override def preStartProcessor() = ()
|
||||
}
|
||||
|
||||
class Pong extends Processor {
|
||||
val pingChannel = context.actorOf(Channel.props, "pingChannel")
|
||||
var counter = 0
|
||||
|
||||
def receive = {
|
||||
case m @ Persistent(Pong, _) ⇒ {
|
||||
counter += 1
|
||||
println(s"received pong ${counter} times ...")
|
||||
m.confirm()
|
||||
if (!recoveryRunning) Thread.sleep(2000)
|
||||
pingChannel ! Deliver(m.withPayload(Ping), sender, Resolve.Destination)
|
||||
}
|
||||
}
|
||||
|
||||
override def preStartProcessor() = ()
|
||||
}
|
||||
|
||||
val system = ActorSystem("example")
|
||||
|
||||
val ping = system.actorOf(Props(classOf[Ping]), "ping")
|
||||
val pong = system.actorOf(Props(classOf[Pong]), "pong")
|
||||
|
||||
ping ! Recover()
|
||||
pong ! Recover()
|
||||
|
||||
ping tell ("init", pong)
|
||||
}
|
||||
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package sample.persistence
|
||||
|
||||
import akka.actor._
|
||||
import akka.pattern.ask
|
||||
import akka.persistence._
|
||||
import akka.util.Timeout
|
||||
|
||||
object ProcessorChannelExample extends App {
|
||||
class ExampleProcessor extends Processor {
|
||||
val channel = context.actorOf(Channel.props, "channel")
|
||||
val destination = context.actorOf(Props[ExampleDestination])
|
||||
var received: List[Persistent] = Nil
|
||||
|
||||
def receive = {
|
||||
case p @ Persistent(payload, _) ⇒ {
|
||||
println(s"processed ${payload}")
|
||||
channel forward Deliver(p.withPayload(s"processed ${payload}"), destination)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ExampleDestination extends Actor {
|
||||
def receive = {
|
||||
case p @ Persistent(payload, snr) ⇒ {
|
||||
println(s"received ${payload}")
|
||||
sender ! s"re: ${payload} (${snr})"
|
||||
p.confirm()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val system = ActorSystem("example")
|
||||
val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor")
|
||||
|
||||
implicit val timeout = Timeout(3000)
|
||||
import system.dispatcher
|
||||
|
||||
processor ? Persistent("a") onSuccess { case reply ⇒ println(s"reply = ${reply}") }
|
||||
processor ? Persistent("b") onSuccess { case reply ⇒ println(s"reply = ${reply}") }
|
||||
|
||||
Thread.sleep(1000)
|
||||
system.shutdown()
|
||||
}
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package sample.persistence
|
||||
|
||||
import akka.actor._
|
||||
import akka.persistence._
|
||||
|
||||
object ProcessorFailureExample extends App {
|
||||
class ExampleProcessor extends Processor {
|
||||
var received: List[String] = Nil // state
|
||||
|
||||
def receive = {
|
||||
case "print" ⇒ println(s"received ${received.reverse}")
|
||||
case "boom" ⇒ throw new Exception("boom")
|
||||
case Persistent("boom", _) ⇒ throw new Exception("boom")
|
||||
case Persistent(payload: String, _) ⇒ received = payload :: received
|
||||
}
|
||||
|
||||
override def preRestartProcessor(reason: Throwable, message: Option[Any]) {
|
||||
message match {
|
||||
case Some(p: Persistent) if !recoveryRunning ⇒ delete(p) // mark failing message as deleted
|
||||
case _ ⇒ // ignore
|
||||
}
|
||||
super.preRestartProcessor(reason, message)
|
||||
}
|
||||
}
|
||||
|
||||
val system = ActorSystem("example")
|
||||
val processor = system.actorOf(Props(classOf[ExampleProcessor]), "processor")
|
||||
|
||||
processor ! Persistent("a")
|
||||
processor ! "print"
|
||||
processor ! "boom" // restart and recovery
|
||||
processor ! "print"
|
||||
processor ! Persistent("b")
|
||||
processor ! "print"
|
||||
processor ! Persistent("boom") // restart, recovery and deletion of message from journal
|
||||
processor ! "print"
|
||||
processor ! Persistent("c")
|
||||
processor ! "print"
|
||||
|
||||
// Will print in a first run (i.e. with empty journal):
|
||||
|
||||
// received List(a)
|
||||
// received List(a, b)
|
||||
// received List(a, b, c)
|
||||
|
||||
// Will print in a second run:
|
||||
|
||||
// received List(a, b, c, a)
|
||||
// received List(a, b, c, a, b)
|
||||
// received List(a, b, c, a, b, c)
|
||||
|
||||
// etc ...
|
||||
|
||||
Thread.sleep(1000)
|
||||
system.shutdown()
|
||||
}
|
||||
|
|
@ -81,7 +81,7 @@ object AkkaBuild extends Build {
|
|||
|
||||
),
|
||||
aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor,
|
||||
mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib, samples, channels, channelsTests,
|
||||
persistence, mailboxes, zeroMQ, kernel, akkaSbtPlugin, osgi, osgiAries, docs, contrib, samples, channels, channelsTests,
|
||||
multiNodeTestkit)
|
||||
)
|
||||
|
||||
|
|
@ -276,6 +276,16 @@ object AkkaBuild extends Build {
|
|||
)
|
||||
)
|
||||
|
||||
lazy val persistence = Project(
|
||||
id = "akka-persistence-experimental",
|
||||
base = file("akka-persistence"),
|
||||
dependencies = Seq(actor, testkit % "test->test"),
|
||||
settings = defaultSettings ++ scaladocSettings ++ experimentalSettings ++ javadocSettings ++ OSGi.persistence ++ Seq(
|
||||
libraryDependencies ++= Dependencies.persistence,
|
||||
previousArtifact := akkaPreviousArtifact("akka-persistence")
|
||||
)
|
||||
)
|
||||
|
||||
val testMailbox = SettingKey[Boolean]("test-mailbox")
|
||||
|
||||
lazy val mailboxes = Project(
|
||||
|
|
@ -431,7 +441,7 @@ object AkkaBuild extends Build {
|
|||
id = "akka-samples",
|
||||
base = file("akka-samples"),
|
||||
settings = parentSettings,
|
||||
aggregate = Seq(camelSample, fsmSample, helloSample, helloKernelSample, remoteSample, clusterSample, multiNodeSample, osgiDiningHakkersSample)
|
||||
aggregate = Seq(camelSample, fsmSample, helloSample, helloKernelSample, remoteSample, persistenceSample, clusterSample, multiNodeSample, osgiDiningHakkersSample)
|
||||
)
|
||||
|
||||
lazy val camelSample = Project(
|
||||
|
|
@ -469,6 +479,13 @@ object AkkaBuild extends Build {
|
|||
settings = sampleSettings
|
||||
)
|
||||
|
||||
lazy val persistenceSample = Project(
|
||||
id = "akka-sample-persistence",
|
||||
base = file("akka-samples/akka-sample-persistence"),
|
||||
dependencies = Seq(actor, persistence),
|
||||
settings = sampleSettings
|
||||
)
|
||||
|
||||
lazy val clusterSample = Project(
|
||||
id = "akka-sample-cluster",
|
||||
base = file("akka-samples/akka-sample-cluster"),
|
||||
|
|
@ -566,7 +583,8 @@ object AkkaBuild extends Build {
|
|||
id = "akka-docs",
|
||||
base = file("akka-docs"),
|
||||
dependencies = Seq(actor, testkit % "test->test", mailboxesCommon % "compile;test->test", channels,
|
||||
remote % "compile;test->test", cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries),
|
||||
remote % "compile;test->test", cluster, slf4j, agent, dataflow, transactor, fileMailbox, zeroMQ, camel, osgi, osgiAries,
|
||||
persistence % "compile;test->test"),
|
||||
settings = defaultSettings ++ site.settings ++ site.sphinxSupport() ++ site.publishSite ++ sphinxPreprocessing ++ cpsPlugin ++ Seq(
|
||||
sourceDirectory in Sphinx <<= baseDirectory / "rst",
|
||||
sphinxPackages in Sphinx <+= baseDirectory { _ / "_sphinx" / "pygments" },
|
||||
|
|
@ -990,6 +1008,8 @@ object AkkaBuild extends Build {
|
|||
|
||||
val transactor = exports(Seq("akka.transactor.*"))
|
||||
|
||||
val persistence = exports(Seq("akka.persistence.*"))
|
||||
|
||||
val testkit = exports(Seq("akka.testkit.*"))
|
||||
|
||||
val zeroMQ = exports(Seq("akka.zeromq.*"), imports = Seq(protobufImport()) )
|
||||
|
|
@ -1050,6 +1070,7 @@ object Dependencies {
|
|||
val ariesBlueprint = "org.apache.aries.blueprint" % "org.apache.aries.blueprint" % "1.1.0" // ApacheV2
|
||||
val osgiCore = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2
|
||||
val osgiCompendium= "org.osgi" % "org.osgi.compendium" % "4.2.0" // ApacheV2
|
||||
val levelDB = "org.iq80.leveldb" % "leveldb" % "0.5" // ApacheV2
|
||||
|
||||
// Camel Sample
|
||||
val camelJetty = "org.apache.camel" % "camel-jetty" % camelCore.revision // ApacheV2
|
||||
|
|
@ -1100,6 +1121,8 @@ object Dependencies {
|
|||
|
||||
val transactor = Seq(scalaStm, Test.scalatest, Test.junit)
|
||||
|
||||
val persistence = Seq(levelDB, Test.scalatest, Test.junit, Test.commonsIo)
|
||||
|
||||
val mailboxes = Seq(Test.scalatest, Test.junit)
|
||||
|
||||
val fileMailbox = Seq(Test.commonsIo, Test.scalatest, Test.junit)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue