Merge pull request #1863 from krasserm/wip-3761-reliable-channels-krasserm

!per #3761 Reliable channels
This commit is contained in:
Patrik Nordwall 2013-12-12 06:45:29 -08:00
commit 8b6b2965e5
27 changed files with 1980 additions and 844 deletions

View file

@ -52,6 +52,10 @@ inconsistent state is fatal. Thus, when the actor fails and is restarted by its
supervisor, the state will be created from scratch, like upon first creating
the actor. This is to enable the ability of self-healing of the system.
Optionally, an actor's state can be automatically recovered to the state
before a restart by persisting received messages and replaying them after
restart (see :ref:`persistence`).
Behavior
--------

View file

@ -257,11 +257,11 @@ throughput or lower latency by removing this guarantee again, which would mean
that choosing between different implementations would allow trading guarantees
versus performance.
Building On Top Of Akka
=======================
Higher-level abstractions
=========================
The philosophy of Akka is to provide a small and consistent tool set which is
well suited for building powerful abstractions on top.
Based on a small and consistent tool set in Akka's core, Akka also provides
powerful, higher-level abstractions on top it.
Messaging Patterns
------------------
@ -274,12 +274,15 @@ delivery is an explicit ACKRETRY protocol. In its simplest form this requires
- a retry mechanism which will resend messages if not acknowledged in time
- a way for the receiver to detect and discard duplicates
The third becomes necessary by virtue of the acknowledgements not being
guaranteed to arrive either. An example of implementing all three requirements
is shown at :ref:`reliable-proxy`. Another way of implementing the third part
would be to make processing the messages idempotent at the receiving end on the
level of the business logic; this is convenient if it arises naturally and
otherwise implemented by keeping track of processed message IDs.
The third becomes necessary by virtue of the acknowledgements not being guaranteed
to arrive either. An ACK-RETRY protocol with business-level acknowledgements is
supported by :ref:`channels` of the Akka Persistence module. Duplicates can be
detected by tracking the sequence numbers of messages received via channels.
Another way of implementing the third part would be to make processing the messages
idempotent on the level of the business logic.
Another example of implementing all three requirements is shown at
:ref:`reliable-proxy` (which is now superseded by :ref:`channels`).
Event Sourcing
--------------
@ -296,7 +299,7 @@ state on a different continent or to react to changes). If the components
state is lost—due to a machine failure or by being pushed out of a cache—it can
easily be reconstructed by replaying the event stream (usually employing
snapshots to speed up the process). :ref:`event-sourcing` is supported by
Akka (see :ref:`persistence`).
Akka Persistence.
Mailbox with Explicit Acknowledgement
-------------------------------------

View file

@ -62,6 +62,12 @@ It allows you to compose atomic message flows with automatic retry and rollback.
See :ref:`Transactors (Scala) <transactors-scala>` and :ref:`Transactors (Java) <transactors-java>`
Persistence
-----------
Messages received by an actor can optionally be persisted and replayed when the actor is started or
restarted. This allows actors to recover their state, even after JVM crashes or when being migrated
to another node.
Scala and Java APIs
===================

View file

@ -4,9 +4,13 @@
package docs.persistence;
import java.util.concurrent.TimeUnit;
import scala.Option;
import scala.concurrent.duration.Duration;
import akka.actor.*;
import akka.japi.Procedure;
import akka.persistence.*;
import static java.util.Arrays.asList;
@ -144,7 +148,10 @@ public class PersistenceDocTest {
public void onReceive(Object message) throws Exception {
if (message instanceof ConfirmablePersistent) {
ConfirmablePersistent p = (ConfirmablePersistent)message;
System.out.println("received " + p.payload());
Object payload = p.payload();
Long sequenceNr = p.sequenceNr();
int redeliveries = p.redeliveries();
// ...
p.confirm();
}
}
@ -160,6 +167,13 @@ public class PersistenceDocTest {
//#channel-id-override
this.channel = getContext().actorOf(Channel.props("my-stable-channel-id"));
//#channel-id-override
//#channel-custom-settings
getContext().actorOf(Channel.props(
ChannelSettings.create()
.withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))
.withRedeliverMax(15)));
//#channel-custom-settings
}
public void onReceive(Object message) throws Exception {
@ -273,12 +287,56 @@ public class PersistenceDocTest {
public void foo() {
//#persistent-channel-example
final ActorRef channel = getContext().actorOf(PersistentChannel.props(),
"myPersistentChannel");
final ActorRef channel = getContext().actorOf(PersistentChannel.props(
PersistentChannelSettings.create()
.withRedeliverInterval(Duration.create(30, TimeUnit.SECONDS))
.withRedeliverMax(15)), "myPersistentChannel");
channel.tell(Deliver.create(Persistent.create("example"), destination), getSelf());
//#persistent-channel-example
//#persistent-channel-reply
PersistentChannelSettings.create().withReplyPersistent(true);
//#persistent-channel-reply
}
}
};
static Object o8 = new Object() {
//#reliable-event-delivery
class MyEventsourcedProcessor extends UntypedEventsourcedProcessor {
private ActorRef destination;
private ActorRef channel;
public MyEventsourcedProcessor(ActorRef destination) {
this.destination = destination;
this.channel = getContext().actorOf(Channel.props(), "channel");
}
private void handleEvent(String event) {
// update state
// ...
// reliably deliver events
channel.tell(Deliver.create(Persistent.create(
event, getCurrentPersistentMessage()), destination), getSelf());
}
public void onReceiveReplay(Object msg) {
if (msg instanceof String) {
handleEvent((String)msg);
}
}
public void onReceiveCommand(Object msg) {
if (msg.equals("cmd")) {
persist("evt", new Procedure<String>() {
public void apply(String event) {
handleEvent(event);
}
});
}
}
}
//#reliable-event-delivery
};
}

View file

@ -11,7 +11,8 @@ persisted but never its current state directly (except for optional snapshots).
to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Stateful
actors are recovered by replaying stored changes to these actors from which they can rebuild internal state. This can
be either the full history of changes or starting from a snapshot of internal actor state which can dramatically
reduce recovery times.
reduce recovery times. Akka persistence also provides point-to-point communication channels with at-least-once
message delivery guarantees.
Storage backends for state changes and snapshots are pluggable in Akka persistence. Currently, these are written to
the local filesystem. Distributed and replicated storage, with the possibility of scaling writes, will be available
@ -48,7 +49,8 @@ Architecture
to that processor, so that it can recover internal state from these messages.
* *Channel*: Channels are used by processors to communicate with other actors. They prevent that replayed messages
are redundantly delivered to these actors.
are redundantly delivered to these actors and provide at-least-once message delivery guarantees, also in case of
sender and receiver JVM crashes.
* *Journal*: A journal stores the sequence of messages sent to a processor. An application can control which messages
are stored and which are received by the processor without being journaled. The storage backend of a journal is
@ -155,9 +157,17 @@ should override ``processorId``.
Later versions of Akka persistence will likely offer a possibility to migrate processor ids.
.. _channels-java:
Channels
========
.. warning::
There are further changes planned to the channel API that couldn't make it into the current milestone.
One example is to have only a single destination per channel to allow gap detection and more advanced
flow control.
Channels are special actors that are used by processors to communicate with other actors (channel destinations).
Channels prevent redundant delivery of replayed messages to destinations during processor recovery. A replayed
message is retained by a channel if its previous delivery has been confirmed by a destination.
@ -165,59 +175,106 @@ message is retained by a channel if its previous delivery has been confirmed by
.. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-example
A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver``
request instructs a channel to send a ``Persistent`` message to a destination where the sender of the ``Deliver``
request is forwarded to the destination. A processor may also reply to a message sender directly by using
``getSender()`` as channel destination (not shown).
request instructs a channel to send a ``Persistent`` message to a destination. Sender references are preserved
by a channel, therefore, a destination can reply to the sender of a ``Deliver`` request.
If a processor wants to reply to a ``Persistent`` message sender it should use the ``getSender()`` reference as
channel destination.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-example-reply
Persistent messages delivered by a channel are of type ``ConfirmablePersistent``. It extends ``Persistent`` and
adds a ``confirm()`` method. Channel destinations confirm the delivery of a ``ConfirmablePersistent`` message by
calling ``confirm()``. This (asynchronously) writes a confirmation entry to the journal. Replayed messages
internally contain these confirmation entries which allows a channel to decide if a message should be retained or
not. ``ConfirmablePersistent`` messages can be used whereever ``Persistent`` messages are expected, which allows
processors to be used as channel destinations, for example.
Persistent messages delivered by a channel are of type ``ConfirmablePersistent``. ``ConfirmablePersistent`` extends
``Persistent`` by adding the methods ``confirm`` method and ``redeliveries`` (see also :ref:`redelivery-java`). Channel
destinations confirm the delivery of a ``ConfirmablePersistent`` message by calling ``confirm()`` an that message.
This asynchronously writes a confirmation entry to the journal. Replayed messages internally contain these confirmation
entries which allows a channel to decide if a message should be retained or not.
A ``Processor`` can also be used as channel destination i.e. it can persist ``ConfirmablePersistent`` messages too.
.. _redelivery-java:
Message re-delivery
-------------------
If an application crashes after a destination called ``confirm()`` but before the confirmation entry could have
been written to the journal then the unconfirmed message will be re-delivered during next recovery of the sending
processor. It is the destination's responsibility to detect the duplicate or simply process the message again if
it's an idempotent receiver. Duplicates can be detected, for example, by tracking sequence numbers.
Channels re-deliver messages to destinations if they do not confirm their receipt within a configurable timeout.
This timeout can be specified as ``redeliverInterval`` when creating a channel, optionally together with the
maximum number of re-deliveries a channel should attempt for each unconfirmed message.
Although a channel prevents message loss in case of sender (JVM) crashes it doesn't attempt re-deliveries if a
destination is unavailable. To achieve reliable communication with a (remote) target, a channel destination may
want to use the :ref:`reliable-proxy` or add the message to a queue that is managed by a third party message
broker, for example. In latter case, the channel destination will first add the received message to the queue
and then call ``confirm()`` on the received ``ConfirmablePersistent`` message.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-custom-settings
Message re-delivery is done out of order with regards to normal delivery i.e. redelivered messages may arrive
later than newer normally delivered messages. The number of re-delivery attempts can be obtained via the
``redeliveries`` method on ``ConfirmablePersistent``.
A channel keeps messages in memory until their successful delivery has been confirmed by their destination(s)
or their maximum number of re-deliveries is reached. In the latter case, the application has to re-send the
correspnding ``Deliver`` request to the channel so that the channel can start a new series of delivery attempts
(starting again with a ``redeliveries`` count of ``0``).
Re-sending ``Deliver`` requests is done automatically if the sending processor replays messages: only ``Deliver``
requests of unconfirmed messages will be served again by the channel. A message replay can be enforced by an
application by restarting the sending processor, for example. A replay will also take place if the whole
application is restarted, either after normal termination or after a crash.
This combination of
* message persistence by sending processors
* message replays by sending processors
* message re-deliveries by channels and
* application-level confirmations (acknowledgements) by destinations
enables channels to provide at-least-once message delivery guarantees. Possible duplicates can be detected by
destinations by tracking message sequence numbers. Message sequence numbers are generated per sending processor.
Depending on how a processor routes outbound messages to destinations, they may either see a contiguous message
sequence or a sequence with gaps.
.. warning::
If a processor emits more than one outbound message per inbound ``Persistent`` message it **must** use a
separate channel for each outbound message to ensure that confirmations are uniquely identifiable, otherwise,
at-least-once message delivery is not guaranteed. This rule has been introduced to avoid writing additional
outbound message identifiers to the journal which would decrease the overall throughput. It is furthermore
recommended to collapse multiple outbound messages to the same destination into a single outbound message,
otherwise, if sent via multiple channels, their ordering is not defined. These restrictions are likely to be
removed in the final release.
Whenever an application wants to have more control how sequence numbers are assigned to messages it should use
an application-specific sequence number generator and include the generated sequence numbers into the ``payload``
of ``Persistent`` messages.
Persistent channels
-------------------
Channels created with ``Channel.props`` do not persist messages. This is not necessary because these (transient)
channels shall only be used in combination with a sending processor that takes care of message persistence.
Channels created with ``Channel.props`` do not persist messages. These channels are usually used in combination
with a sending processor that takes care of persistence, hence, channel-specific persistence is not necessary in
this case. They are referred to as transient channels in the following.
However, if an application wants to use a channel standalone (without a sending processor), to prevent message
loss in case of a sender (JVM) crash, it should use a persistent channel which can be created with ``PersistentChannel.props``.
A persistent channel additionally persists messages before they are delivered. Persistence is achieved by an
internal processor that delegates delivery to a transient channel. A persistent channel, when used standalone,
can therefore provide the same message re-delivery semantics as a transient channel in combination with an
application-defined processor.
Applications may also use transient channels standalone (i.e. without a sending processor) if re-delivery attempts
to destinations are required but message loss in case of a sender JVM crash is not an issue. If applications want to
use standalone channels but message loss is not acceptable, they should use persistent channels. A persistent channel
can be created with ``PersistentChannel.props`` and configured with a ``PersistentChannelSettings`` object.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#persistent-channel-example
A persistent channel is like a transient channel that additionally persists ``Deliver`` requests before serving it.
Hence, it can recover from sender JVM crashes and provide the same message re-delivery semantics as a transient
channel in combination with an application-defined processor.
By default, a persistent channel doesn't reply whether a ``Persistent`` message, sent with ``Deliver``, has been
successfully persisted or not. This can be enabled by creating the channel with the ``persistentReply`` parameter
set to ``true``: ``PersistentChannel.props(true)``. With this setting, either the successfully persisted message
is replied to the sender or a ``PersistenceFailure``. In case of a persistence failure, the sender should re-send
the message.
successfully persisted or not. This can be enabled by creating the channel with the ``replyPersistent`` configuration
parameter set to ``true``:
.. includecode:: code/docs/persistence/PersistenceDocTest.java#persistent-channel-reply
With this setting, either the successfully persisted message is replied to the sender or a ``PersistenceFailure``.
In case of a persistence failure, the sender should re-send the message.
Using a persistent channel in combination with an application-defined processor can make sense if destinations are
unavailable for a long time and an application doesn't want to buffer all messages in memory (but write them to the
journal instead). In this case, delivery can be disabled with ``DisableDelivery`` (to stop delivery and persist-only)
and re-enabled with ``EnableDelivery``. A disabled channel that receives ``EnableDelivery`` will restart itself and
re-deliver all persisted, unconfirmed messages before serving new ``Deliver`` requests.
journal only). In this case, delivery can be disabled by sending the channel a ``DisableDelivery`` message (to
stop delivery and persist-only) and re-enabled again by sending it an ``EnableDelivery`` message. A disabled channel
that receives an ``EnableDelivery`` message, processes all persisted, unconfirmed ``Deliver`` requests again before
serving new ones.
Sender resolution
-----------------
@ -272,7 +329,7 @@ Sequence number
---------------
The sequence number of a ``Persistent`` message can be obtained via its ``sequenceNr`` method. Persistent
messages are assigned sequence numbers on a per-processor basis (or per persistent channel basis if used
messages are assigned sequence numbers on a per-processor basis (or per channel basis if used
standalone). A sequence starts at ``1L`` and doesn't contain gaps unless a processor deletes a message.
.. _snapshots-java:
@ -365,6 +422,19 @@ The example also demonstrates how to change the processor's default behavior, de
another behavior, defined by ``otherCommandHandler``, and back using ``getContext().become()`` and
``getContext().unbecome()``. See also the API docs of ``persist`` for further details.
Reliable event delivery
-----------------------
Sending events from an event handler to another actor directly doesn't guarantee delivery of these events. To
guarantee at-least-once delivery, :ref:`channels-java` must be used. In this case, also replayed events (received by
``receiveReplay``) must be sent to a channel, as shown in the following example:
.. includecode:: code/docs/persistence/PersistenceDocTest.java#reliable-event-delivery
In larger integration scenarios, channel destinations may be actors that submit received events to an external
message broker, for example. After having successfully submitted an event, they should call ``confirm()`` on the
received ``ConfirmablePersistent`` message.
Batch writes
============
@ -458,6 +528,8 @@ directory. This location can be changed by configuration where the specified pat
With this plugin, each actor system runs its own private LevelDB instance.
.. _shared-leveldb-journal-java:
Shared LevelDB journal
----------------------
@ -518,3 +590,18 @@ it must add
to the application configuration. If not specified, a default serializer is used, which is the ``JavaSerializer``
in this example.
Testing
=======
When running tests with LevelDB default settings in ``sbt``, make sure to set ``fork := true`` in your sbt project
otherwise, you'll see an ``UnsatisfiedLinkError``. Alternatively, you can switch to a LevelDB Java port by setting
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#native-config
or
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-native-config
in your Akka configuration. The latter setting applies if you're using a :ref:`shared-leveldb-journal-java`. The LevelDB
Java port is for testing purposes only.

View file

@ -4,6 +4,9 @@
package docs.persistence
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor.ActorSystem
import akka.persistence._
@ -113,8 +116,8 @@ trait PersistenceDocSpec {
class MyDestination extends Actor {
def receive = {
case p @ ConfirmablePersistent(payload, _)
println(s"received ${payload}")
case p @ ConfirmablePersistent(payload, sequenceNr, redeliveries)
// ...
p.confirm()
}
}
@ -129,6 +132,12 @@ trait PersistenceDocSpec {
context.actorOf(Channel.props("my-stable-channel-id"))
//#channel-id-override
//#channel-custom-settings
context.actorOf(Channel.props(
ChannelSettings(redeliverInterval = 30 seconds, redeliverMax = 15)),
name = "myChannel")
//#channel-custom-settings
def receive = {
case p @ Persistent(payload, _)
//#channel-example-reply
@ -241,11 +250,44 @@ trait PersistenceDocSpec {
trait MyActor extends Actor {
val destination: ActorRef = null
//#persistent-channel-example
val channel = context.actorOf(PersistentChannel.props(),
val channel = context.actorOf(PersistentChannel.props(
PersistentChannelSettings(redeliverInterval = 30 seconds, redeliverMax = 15)),
name = "myPersistentChannel")
channel ! Deliver(Persistent("example"), destination)
//#persistent-channel-example
//#persistent-channel-reply
PersistentChannelSettings(replyPersistent = true)
//#persistent-channel-reply
}
}
new AnyRef {
import akka.actor.ActorRef
//#reliable-event-delivery
class MyEventsourcedProcessor(destination: ActorRef) extends EventsourcedProcessor {
val channel = context.actorOf(Channel.props("channel"))
def handleEvent(event: String) = {
// update state
// ...
// reliably deliver events
channel ! Deliver(Persistent(event), destination)
}
def receiveReplay: Receive = {
case event: String handleEvent(event)
}
def receiveCommand: Receive = {
case "cmd" {
// ...
persist("evt")(handleEvent)
}
}
}
//#reliable-event-delivery
}
}

View file

@ -32,6 +32,9 @@ object PersistencePluginDocSpec {
//#snapshot-config
akka.persistence.snapshot-store.local.dir = "target/snapshots"
//#snapshot-config
//#native-config
akka.persistence.journal.leveldb.native = off
//#native-config
"""
}
@ -80,6 +83,9 @@ object SharedLeveldbPluginDocSpec {
//#shared-journal-config
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
//#shared-journal-config
//#shared-store-native-config
akka.persistence.journal.leveldb-shared.store.native = off
//#shared-store-native-config
//#shared-store-config
akka.persistence.journal.leveldb-shared.store.dir = "target/shared"
//#shared-store-config

View file

@ -11,7 +11,8 @@ persisted but never its current state directly (except for optional snapshots).
to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Stateful
actors are recovered by replaying stored changes to these actors from which they can rebuild internal state. This can
be either the full history of changes or starting from a snapshot of internal actor state which can dramatically
reduce recovery times.
reduce recovery times. Akka persistence also provides point-to-point communication channels with at-least-once
message delivery guarantees.
Storage backends for state changes and snapshots are pluggable in Akka persistence. Currently, these are written to
the local filesystem. Distributed and replicated storage, with the possibility of scaling writes, will be available
@ -44,7 +45,8 @@ Architecture
to that processor, so that it can recover internal state from these messages.
* *Channel*: Channels are used by processors to communicate with other actors. They prevent that replayed messages
are redundantly delivered to these actors.
are redundantly delivered to these actors and provide at-least-once message delivery guarantees, also in case of
sender and receiver JVM crashes.
* *Journal*: A journal stores the sequence of messages sent to a processor. An application can control which messages
are stored and which are received by the processor without being journaled. The storage backend of a journal is
@ -150,9 +152,17 @@ should override ``processorId``.
Later versions of Akka persistence will likely offer a possibility to migrate processor ids.
.. _channels:
Channels
========
.. warning::
There are further changes planned to the channel API that couldn't make it into the current milestone.
One example is to have only a single destination per channel to allow gap detection and more advanced
flow control.
Channels are special actors that are used by processors to communicate with other actors (channel destinations).
Channels prevent redundant delivery of replayed messages to destinations during processor recovery. A replayed
message is retained by a channel if its previous delivery has been confirmed by a destination.
@ -160,59 +170,106 @@ message is retained by a channel if its previous delivery has been confirmed by
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-example
A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver``
request instructs a channel to send a ``Persistent`` message to a destination where the sender of the ``Deliver``
request is forwarded to the destination. A processor may also reply to a message sender directly by using ``sender``
as channel destination (not shown).
request instructs a channel to send a ``Persistent`` message to a destination. Sender references are preserved
by a channel, therefore, a destination can reply to the sender of a ``Deliver`` request.
If a processor wants to reply to a ``Persistent`` message sender it should use the ``sender`` reference as channel
destination.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-example-reply
Persistent messages delivered by a channel are of type ``ConfirmablePersistent``. It extends ``Persistent`` and
adds a ``confirm()`` method. Channel destinations confirm the delivery of a ``ConfirmablePersistent`` message by
calling ``confirm()``. This (asynchronously) writes a confirmation entry to the journal. Replayed messages
internally contain these confirmation entries which allows a channel to decide if a message should be retained or
not. ``ConfirmablePersistent`` messages can be used whereever ``Persistent`` messages are expected, which allows
processors to be used as channel destinations, for example.
Persistent messages delivered by a channel are of type ``ConfirmablePersistent``. ``ConfirmablePersistent`` extends
``Persistent`` by adding the methods ``confirm`` method and ``redeliveries`` (see also :ref:`redelivery`). Channel
destinations confirm the delivery of a ``ConfirmablePersistent`` message by calling ``confirm()`` an that message.
This asynchronously writes a confirmation entry to the journal. Replayed messages internally contain these confirmation
entries which allows a channel to decide if a message should be retained or not.
A ``Processor`` can also be used as channel destination i.e. it can persist ``ConfirmablePersistent`` messages too.
.. _redelivery:
Message re-delivery
-------------------
If an application crashes after a destination called ``confirm()`` but before the confirmation entry could have
been written to the journal then the unconfirmed message will be re-delivered during next recovery of the sending
processor. It is the destination's responsibility to detect the duplicate or simply process the message again if
it's an idempotent receiver. Duplicates can be detected, for example, by tracking sequence numbers.
Channels re-deliver messages to destinations if they do not confirm their receipt within a configurable timeout.
This timeout can be specified as ``redeliverInterval`` when creating a channel, optionally together with the
maximum number of re-deliveries a channel should attempt for each unconfirmed message.
Although a channel prevents message loss in case of sender (JVM) crashes it doesn't attempt re-deliveries if a
destination is unavailable. To achieve reliable communication with a (remote) target, a channel destination may
want to use the :ref:`reliable-proxy` or add the message to a queue that is managed by a third party message
broker, for example. In latter case, the channel destination will first add the received message to the queue
and then call ``confirm()`` on the received ``ConfirmablePersistent`` message.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-custom-settings
Message re-delivery is done out of order with regards to normal delivery i.e. redelivered messages may arrive
later than newer normally delivered messages. The number of re-delivery attempts can be obtained via the
``redeliveries`` method on ``ConfirmablePersistent`` or by pattern matching.
A channel keeps messages in memory until their successful delivery has been confirmed by their destination(s)
or their maximum number of re-deliveries is reached. In the latter case, the application has to re-send the
correspnding ``Deliver`` request to the channel so that the channel can start a new series of delivery attempts
(starting again with a ``redeliveries`` count of ``0``).
Re-sending ``Deliver`` requests is done automatically if the sending processor replays messages: only ``Deliver``
requests of unconfirmed messages will be served again by the channel. A message replay can be enforced by an
application by restarting the sending processor, for example. A replay will also take place if the whole
application is restarted, either after normal termination or after a crash.
This combination of
* message persistence by sending processors
* message replays by sending processors
* message re-deliveries by channels and
* application-level confirmations (acknowledgements) by destinations
enables channels to provide at-least-once message delivery guarantees. Possible duplicates can be detected by
destinations by tracking message sequence numbers. Message sequence numbers are generated per sending processor.
Depending on how a processor routes outbound messages to destinations, they may either see a contiguous message
sequence or a sequence with gaps.
.. warning::
If a processor emits more than one outbound message per inbound ``Persistent`` message it **must** use a
separate channel for each outbound message to ensure that confirmations are uniquely identifiable, otherwise,
at-least-once message delivery is not guaranteed. This rule has been introduced to avoid writing additional
outbound message identifiers to the journal which would decrease the overall throughput. It is furthermore
recommended to collapse multiple outbound messages to the same destination into a single outbound message,
otherwise, if sent via multiple channels, their ordering is not defined. These restrictions are likely to be
removed in the final release.
Whenever an application wants to have more control how sequence numbers are assigned to messages it should use
an application-specific sequence number generator and include the generated sequence numbers into the ``payload``
of ``Persistent`` messages.
Persistent channels
-------------------
Channels created with ``Channel.props`` do not persist messages. This is not necessary because these (transient)
channels shall only be used in combination with a sending processor that takes care of message persistence.
Channels created with ``Channel.props`` do not persist messages. These channels are usually used in combination
with a sending processor that takes care of persistence, hence, channel-specific persistence is not necessary in
this case. They are referred to as transient channels in the following.
However, if an application wants to use a channel standalone (without a sending processor), to prevent message
loss in case of a sender (JVM) crash, it should use a persistent channel which can be created with ``PersistentChannel.props``.
A persistent channel additionally persists messages before they are delivered. Persistence is achieved by an
internal processor that delegates delivery to a transient channel. A persistent channel, when used standalone,
can therefore provide the same message re-delivery semantics as a transient channel in combination with an
application-defined processor.
Applications may also use transient channels standalone (i.e. without a sending processor) if re-delivery attempts
to destinations are required but message loss in case of a sender JVM crash is not an issue. If applications want to
use standalone channels but message loss is not acceptable, they should use persistent channels. A persistent channel
can be created with ``PersistentChannel.props`` and configured with a ``PersistentChannelSettings`` object.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistent-channel-example
A persistent channel is like a transient channel that additionally persists ``Deliver`` requests before serving it.
Hence, it can recover from sender JVM crashes and provide the same message re-delivery semantics as a transient
channel in combination with an application-defined processor.
By default, a persistent channel doesn't reply whether a ``Persistent`` message, sent with ``Deliver``, has been
successfully persisted or not. This can be enabled by creating the channel with
``PersistentChannel.props(persistentReply = true)``. With this setting, either the successfully persisted message
is replied to the sender or a ``PersistenceFailure``. In case of a persistence failure, the sender should re-send
the message.
successfully persisted or not. This can be enabled by creating the channel with the ``replyPersistent`` configuration
parameter set to ``true``:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistent-channel-reply
With this setting, either the successfully persisted message is replied to the sender or a ``PersistenceFailure``.
In case of a persistence failure, the sender should re-send the message.
Using a persistent channel in combination with an application-defined processor can make sense if destinations are
unavailable for a long time and an application doesn't want to buffer all messages in memory (but write them to the
journal instead). In this case, delivery can be disabled with ``DisableDelivery`` (to stop delivery and persist-only)
and re-enabled with ``EnableDelivery``. A disabled channel that receives ``EnableDelivery`` will restart itself and
re-deliver all persisted, unconfirmed messages before serving new ``Deliver`` requests.
journal only). In this case, delivery can be disabled by sending the channel a ``DisableDelivery`` message (to
stop delivery and persist-only) and re-enabled again by sending it an ``EnableDelivery`` message. A disabled channel
that receives an ``EnableDelivery`` message, processes all persisted, unconfirmed ``Deliver`` requests again before
serving new ones.
Sender resolution
-----------------
@ -279,7 +336,7 @@ method or by pattern matching
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#sequence-nr-pattern-matching
Persistent messages are assigned sequence numbers on a per-processor basis (or per persistent channel basis if used
Persistent messages are assigned sequence numbers on a per-processor basis (or per channel basis if used
standalone). A sequence starts at ``1L`` and doesn't contain gaps unless a processor deletes a message.
.. _snapshots:
@ -376,6 +433,19 @@ The example also demonstrates how to change the processor's default behavior, de
another behavior, defined by ``otherCommandHandler``, and back using ``context.become()`` and ``context.unbecome()``.
See also the API docs of ``persist`` for further details.
Reliable event delivery
-----------------------
Sending events from an event handler to another actor directly doesn't guarantee delivery of these events. To
guarantee at-least-once delivery, :ref:`channels` must be used. In this case, also replayed events (received by
``receiveReplay``) must be sent to a channel, as shown in the following example:
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#reliable-event-delivery
In larger integration scenarios, channel destinations may be actors that submit received events to an external
message broker, for example. After having successfully submitted an event, they should call ``confirm()`` on the
received ``ConfirmablePersistent`` message.
Batch writes
============
@ -469,6 +539,9 @@ directory. This location can be changed by configuration where the specified pat
With this plugin, each actor system runs its own private LevelDB instance.
.. _shared-leveldb-journal:
Shared LevelDB journal
----------------------
@ -537,6 +610,21 @@ it must add
to the application configuration. If not specified, a default serializer is used, which is the ``JavaSerializer``
in this example.
Testing
=======
When running tests with LevelDB default settings in ``sbt``, make sure to set ``fork := true`` in your sbt project
otherwise, you'll see an ``UnsatisfiedLinkError``. Alternatively, you can switch to a LevelDB Java port by setting
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#native-config
or
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-native-config
in your Akka configuration. The latter setting applies if you're using a :ref:`shared-leveldb-journal`. The LevelDB
Java port is for testing purposes only.
Miscellaneous
=============

View file

@ -13,13 +13,14 @@ message PersistentMessage {
optional PersistentPayload payload = 1;
optional int64 sequenceNr = 2;
optional string processorId = 3;
optional bool deleted = 5;
optional bool resolved = 6;
repeated string confirms = 8;
optional bool confirmable = 11;
optional ConfirmMessage confirmMessage = 10;
optional string confirmTarget = 9;
optional string sender = 7;
optional bool deleted = 4;
optional bool resolved = 5;
optional int32 redeliveries = 6;
repeated string confirms = 7;
optional bool confirmable = 8;
optional ConfirmMessage confirmMessage = 9;
optional string confirmTarget = 10;
optional string sender = 11;
}
message PersistentPayload {
@ -30,8 +31,10 @@ message PersistentPayload {
message ConfirmMessage {
optional string processorId = 1;
optional int64 sequenceNr = 2;
optional int64 messageSequenceNr = 2;
optional string channelId = 3;
optional int64 wrapperSequenceNr = 4;
optional string channelEndpoint = 5;
}
message DeliverMessage {

View file

@ -26,10 +26,10 @@ akka {
journal {
# Maximum size of a persistent message batch written to the journal. Only applies to
# internally created batches by processors that receive persistent messages individually.
# Application-defined batches, even if larger than this setting, are always written as
# a single isolated batch.
# Maximum size of a persistent message batch written to the journal.
# Only applies to internally created batches by processors that receive
# persistent messages individually. Application-defined batches, even if
# larger than this setting, are always written as a single isolated batch.
max-batch-size = 200
# Path to the journal plugin to be used
@ -131,18 +131,18 @@ akka {
}
default-replay-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-max = 8
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 8
}
}
default-stream-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-max = 8
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 8
}
}
}

View file

@ -4,17 +4,63 @@
package akka.persistence
import akka.AkkaException
import akka.actor._
import scala.collection.immutable
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.actor._
import akka.dispatch.Envelope
import akka.persistence.JournalProtocol.Confirm
import akka.persistence.serialization.Message
/**
* A [[Channel]] configuration object.
*
* @param redeliverMax maximum number of redeliveries (default is 5).
* @param redeliverInterval interval between redeliveries (default is 5 seconds).
*/
@SerialVersionUID(1L)
class ChannelSettings(
val redeliverMax: Int,
val redeliverInterval: FiniteDuration) extends Serializable {
/**
* Java API.
*/
def withRedeliverMax(redeliverMax: Int): ChannelSettings =
update(redeliverMax = redeliverMax)
/**
* Java API.
*/
def withRedeliverInterval(redeliverInterval: FiniteDuration): ChannelSettings =
update(redeliverInterval = redeliverInterval)
private def update(
redeliverMax: Int = redeliverMax,
redeliverInterval: FiniteDuration = redeliverInterval): ChannelSettings =
new ChannelSettings(redeliverMax, redeliverInterval)
}
object ChannelSettings {
def apply(
redeliverMax: Int = 5,
redeliverInterval: FiniteDuration = 5 seconds): ChannelSettings =
new ChannelSettings(redeliverMax, redeliverInterval)
/**
* Java API.
*/
def create() = apply()
}
/**
* A channel is used by [[Processor]]s for sending [[Persistent]] messages to destinations. The main
* responsibility of a channel is to prevent redundant delivery of replayed messages to destinations
* when a processor is recovered.
*
* A channel can be instructed to deliver a persistent message to a `destination` via the [[Deliver]]
* A channel is instructed to deliver a persistent message to a `destination` with the [[Deliver]]
* command.
*
* {{{
@ -53,224 +99,84 @@ import akka.persistence.serialization.Message
* {{{
* class MyDestination extends Actor {
* def receive = {
* case cp @ ConfirmablePersistent(payload, sequenceNr) => cp.confirm()
* case cp @ ConfirmablePersistent(payload, sequenceNr, redeliveries) => cp.confirm()
* }
* }
* }}}
*
* A channel will only re-deliver messages if the sending processor is recovered and delivery of these
* messages has not been confirmed yet. Hence, a channel can be used to avoid message loss in case of
* sender JVM crashes, for example. A channel, however, does not attempt any re-deliveries should a
* destination be unavailable. Re-delivery to destinations (in case of network failures or destination
* JVM crashes) is an application-level concern and can be done by using a reliable proxy, for example.
* If a destination does not confirm the receipt of a `ConfirmablePersistent` message, it will be redelivered
* by the channel according to the parameters in [[ChannelSettings]]. Message redelivery is done out of order
* with regards to normal delivery i.e. redelivered messages may arrive later than newer normally delivered
* messages. Redelivered messages have a `redeliveries` value greater than zero.
*
* If the maximum number of redeliveries for a certain message is reached and there is still no confirmation
* from the destination, then this message is removed from the channel. In order to deliver that message to
* the destination again, the processor must replay its stored messages to the channel (during start or restart).
* Replayed, unconfirmed messages are then processed and delivered by the channel again. These messages are now
* duplicates (with a `redeliveries` counter starting from zero). Duplicates can be detected by destinations
* by tracking message sequence numbers.
*
* @see [[Deliver]]
*/
sealed class Channel private[akka] (_channelId: Option[String]) extends Actor with Stash {
private val extension = Persistence(context.system)
final class Channel private[akka] (_channelId: Option[String], channelSettings: ChannelSettings) extends Actor {
private val id = _channelId match {
case Some(cid) cid
case None extension.channelId(self)
case None Persistence(context.system).channelId(self)
}
import ResolvedDelivery._
private val journal = Persistence(context.system).journalFor(id)
private val delivering: Actor.Receive = {
case Deliver(persistent: PersistentRepr, destination, resolve)
if (!persistent.confirms.contains(id)) {
val prepared = prepareDelivery(persistent)
resolve match {
case Resolve.Sender if !prepared.resolved
context.actorOf(Props(classOf[ResolvedSenderDelivery], prepared, destination, sender)) ! DeliverResolved
context.become(buffering, false)
case Resolve.Destination if !prepared.resolved
context.actorOf(Props(classOf[ResolvedDestinationDelivery], prepared, destination, sender)) ! DeliverResolved
context.become(buffering, false)
case _ destination tell (prepared, sender)
}
}
unstash()
private val reliableDelivery = context.actorOf(Props(classOf[ReliableDelivery], channelSettings))
private val resolvedDelivery = context.actorOf(Props(classOf[ResolvedDelivery], reliableDelivery))
def receive = {
case d @ Deliver(persistent: PersistentRepr, _, _)
if (!persistent.confirms.contains(id)) resolvedDelivery forward d.copy(prepareDelivery(persistent))
}
private val buffering: Actor.Receive = {
case DeliveredResolved | DeliveredUnresolved
context.unbecome()
unstash()
case _: Deliver stash()
}
def receive = delivering
private[akka] def prepareDelivery(persistent: PersistentRepr): PersistentRepr = {
ConfirmablePersistentImpl(
persistent = persistent,
confirmTarget = extension.journalFor(persistent.processorId),
private def prepareDelivery(persistent: PersistentRepr): PersistentRepr =
ConfirmablePersistentImpl(persistent,
confirmTarget = journal,
confirmMessage = Confirm(persistent.processorId, persistent.sequenceNr, id))
}
}
object Channel {
/**
* Returns a channel configuration object for creating a [[Channel]] with a
* generated id.
* Returns a channel actor configuration object for creating a [[Channel]] with a
* generated id and default [[ChannelSettings]].
*/
def props(): Props = Props(classOf[Channel], None)
def props(): Props =
props(ChannelSettings())
/**
* Returns a channel configuration object for creating a [[Channel]] with the
* specified id.
* Returns a channel actor configuration object for creating a [[Channel]] with a
* generated id and specified `channelSettings`.
*
* @param channelSettings channel configuration object.
*/
def props(channelSettings: ChannelSettings): Props =
Props(classOf[Channel], None, channelSettings)
/**
* Returns a channel actor configuration object for creating a [[Channel]] with the
* specified id and default [[ChannelSettings]].
*
* @param channelId channel id.
*/
def props(channelId: String): Props = Props(classOf[Channel], Some(channelId))
}
def props(channelId: String): Props =
props(channelId, ChannelSettings())
/**
* A [[PersistentChannel]] implements the same functionality as a [[Channel]] but additionally
* persists messages before they are delivered. Therefore, the main use case of a persistent
* channel is standalone usage i.e. independent of a sending [[Processor]]. Messages that have
* been persisted by a persistent channel are deleted again when destinations confirm the receipt
* of these messages.
*
* Using a persistent channel in combination with a [[Processor]] can make sense if destinations
* are unavailable for a long time and an application doesn't want to buffer all messages in
* memory (but write them to a journal instead). In this case, delivery can be disabled with
* [[DisableDelivery]] (to stop delivery and persist-only) and re-enabled with [[EnableDelivery]].
*
* A persistent channel can also be configured to reply whether persisting a message was successful
* or not (see `PersistentChannel.props` methods). If enabled, the sender will receive the persisted
* message as reply (i.e. a [[Persistent]] message), otherwise a [[PersistenceFailure]] message.
*
* A persistent channel will only re-deliver un-confirmed, stored messages if it is started or re-
* enabled with [[EnableDelivery]]. Hence, a persistent channel can be used to avoid message loss
* in case of sender JVM crashes, for example. A channel, however, does not attempt any re-deliveries
* should a destination be unavailable. Re-delivery to destinations (in case of network failures or
* destination JVM crashes) is an application-level concern and can be done by using a reliable proxy,
* for example.
*/
final class PersistentChannel private[akka] (_channelId: Option[String], persistentReply: Boolean) extends EventsourcedProcessor {
override val processorId = _channelId.getOrElse(super.processorId)
private val journal = Persistence(context.system).journalFor(processorId)
private val channel = context.actorOf(Props(classOf[NoPrepChannel], processorId))
private var deliveryEnabled = true
def receiveReplay: Receive = {
case Deliver(persistent: PersistentRepr, destination, resolve) deliver(prepareDelivery(persistent), destination, resolve)
}
def receiveCommand: Receive = {
case d @ Deliver(persistent: PersistentRepr, destination, resolve)
if (!persistent.confirms.contains(processorId)) {
persist(d) { _
val prepared = prepareDelivery(persistent)
if (persistent.processorId != PersistentRepr.Undefined)
journal ! Confirm(persistent.processorId, persistent.sequenceNr, processorId)
if (persistentReply)
sender ! prepared
if (deliveryEnabled)
deliver(prepared, destination, resolve)
}
}
case c: Confirm deleteMessage(c.sequenceNr, true)
case DisableDelivery deliveryEnabled = false
case EnableDelivery if (!deliveryEnabled) throw new ChannelRestartRequiredException
case p: PersistenceFailure if (persistentReply) sender ! p
}
private def prepareDelivery(persistent: PersistentRepr): PersistentRepr = currentPersistentMessage.map { current
val sequenceNr = if (persistent.sequenceNr == 0L) current.sequenceNr else persistent.sequenceNr
val resolved = persistent.resolved && current.asInstanceOf[PersistentRepr].resolved
persistent.update(sequenceNr = sequenceNr, resolved = resolved)
} getOrElse (persistent)
private def deliver(persistent: PersistentRepr, destination: ActorRef, resolve: Resolve.ResolveStrategy) = currentPersistentMessage.foreach { current
channel forward Deliver(persistent = ConfirmablePersistentImpl(persistent,
confirmTarget = self,
confirmMessage = Confirm(processorId, current.sequenceNr, PersistentRepr.Undefined)), destination, resolve)
}
}
object PersistentChannel {
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with a
* generated id. The sender will not receive persistence completion replies.
*/
def props(): Props = props(persistentReply = false)
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with a
* generated id.
*
* @param persistentReply if `true` the sender will receive the successfully stored
* [[Persistent]] message that has been submitted with a
* [[Deliver]] request, or a [[PersistenceFailure]] message
* in case of a persistence failure.
*/
def props(persistentReply: Boolean): Props = Props(classOf[PersistentChannel], None, persistentReply)
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with the
* specified id. The sender will not receive persistence completion replies.
* Returns a channel actor configuration object for creating a [[Channel]] with the
* specified id and specified `channelSettings`.
*
* @param channelId channel id.
* @param channelSettings channel configuration object.
*/
def props(channelId: String): Props = props(channelId, persistentReply = false)
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with the
* specified id.
*
* @param channelId channel id.
* @param persistentReply if `true` the sender will receive the successfully stored
* [[Persistent]] message that has been submitted with a
* [[Deliver]] request, or a [[PersistenceFailure]] message
* in case of a persistence failure.
*/
def props(channelId: String, persistentReply: Boolean): Props = Props(classOf[PersistentChannel], Some(channelId), persistentReply)
def props(channelId: String, channelSettings: ChannelSettings): Props =
Props(classOf[Channel], Some(channelId), channelSettings)
}
/**
* Instructs a [[PersistentChannel]] to disable the delivery of [[Persistent]] messages to their destination.
* The persistent channel, however, continues to persist messages (for later delivery).
*
* @see [[EnableDelivery]]
*/
@SerialVersionUID(1L)
case object DisableDelivery {
/**
* Java API.
*/
def getInstance = this
}
/**
* Instructs a [[PersistentChannel]] to re-enable the delivery of [[Persistent]] messages to their destination.
* This will first deliver all messages that have been stored by a persistent channel for which no confirmation
* is available yet. New [[Deliver]] requests are processed after all stored messages have been delivered. This
* request only has an effect if a persistent channel has previously been disabled with [[DisableDelivery]].
*
* @see [[DisableDelivery]]
*/
@SerialVersionUID(1L)
case object EnableDelivery {
/**
* Java API.
*/
def getInstance = this
}
/**
* Thrown by a persistent channel when [[EnableDelivery]] has been requested and delivery has been previously
* disabled for that channel.
*/
@SerialVersionUID(1L)
class ChannelRestartRequiredException extends AkkaException("channel restart required for enabling delivery")
/**
* Instructs a [[Channel]] or [[PersistentChannel]] to deliver `persistent` message to
* destination `destination`. The `resolve` parameter can be:
@ -375,68 +281,130 @@ object Resolve {
}
/**
* Resolved delivery support.
* Resolves actor references as specified by [[Deliver]] requests and then delegates delivery
* to `next`.
*/
private trait ResolvedDelivery extends Actor {
import scala.concurrent.duration._
import scala.language.postfixOps
import ResolvedDelivery._
private class ResolvedDelivery(next: ActorRef) extends Actor with Stash {
private var currentResolution: Envelope = _
context.setReceiveTimeout(5 seconds) // TODO: make configurable
private val delivering: Receive = {
case d @ Deliver(persistent: PersistentRepr, destination, resolve)
resolve match {
case Resolve.Sender if !persistent.resolved
context.actorSelection(sender.path) ! Identify(1)
context.become(resolving, discardOld = false)
currentResolution = Envelope(d, sender, context.system)
case Resolve.Destination if !persistent.resolved
context.actorSelection(destination.path) ! Identify(1)
context.become(resolving, discardOld = false)
currentResolution = Envelope(d, sender, context.system)
case _ next forward d
}
unstash()
}
def path: ActorPath
def onResolveSuccess(ref: ActorRef): Unit
def onResolveFailure(): Unit
private val resolving: Receive = {
case ActorIdentity(1, resolvedOption)
val Envelope(d: Deliver, sender) = currentResolution
if (d.resolve == Resolve.Sender) {
next tell (d, resolvedOption.getOrElse(sender))
} else if (d.resolve == Resolve.Destination) {
next tell (d.copy(destination = resolvedOption.getOrElse(d.destination)), sender)
}
context.unbecome()
unstash()
case _: Deliver stash()
}
def receive = delivering
}
/**
* Reliably deliver messages contained in [[Deliver]] requests to their destinations. Unconfirmed
* messages are redelivered according to the parameters in [[ChannelSettings]].
*/
private class ReliableDelivery(channelSettings: ChannelSettings) extends Actor {
import channelSettings._
import ReliableDelivery._
private val redelivery = context.actorOf(Props(classOf[Redelivery], channelSettings))
private var attempts: DeliveryAttempts = Map.empty
private var sequenceNr: Long = 0L
def receive = {
case DeliverResolved
context.actorSelection(path) ! Identify(1)
case ActorIdentity(1, Some(ref))
onResolveSuccess(ref)
shutdown(DeliveredResolved)
case ActorIdentity(1, None)
onResolveFailure()
shutdown(DeliveredUnresolved)
case ReceiveTimeout
onResolveFailure()
shutdown(DeliveredUnresolved)
case d @ Deliver(persistent: PersistentRepr, destination, _)
val dsnr = nextSequenceNr()
val psnr = persistent.sequenceNr
val confirm = persistent.confirmMessage.copy(channelEndpoint = self)
val updated = persistent.update(confirmMessage = confirm, sequenceNr = if (psnr == 0) dsnr else psnr)
destination forward updated
attempts += ((updated.processorId, updated.sequenceNr) -> DeliveryAttempt(updated, destination, sender, dsnr))
case c @ Confirm(processorId, messageSequenceNr, _, _, _)
attempts -= ((processorId, messageSequenceNr))
case Redeliver
val limit = System.nanoTime - redeliverInterval.toNanos
val (older, younger) = attempts.partition { case (_, a) a.timestamp < limit }
redelivery ! Redeliver(older, redeliverMax)
attempts = younger
}
def shutdown(message: Any) {
context.parent ! message
context.stop(self)
private def nextSequenceNr(): Long = {
sequenceNr += 1
sequenceNr
}
}
private object ResolvedDelivery {
case object DeliverResolved
case object DeliveredResolved
case object DeliveredUnresolved
private object ReliableDelivery {
type DeliveryAttempts = immutable.Map[(String, Long), DeliveryAttempt]
case class DeliveryAttempt(persistent: PersistentRepr, destination: ActorRef, sender: ActorRef, deliverySequenceNr: Long, timestamp: Long = System.nanoTime) {
def withChannelEndpoint(channelEndpoint: ActorRef) =
copy(persistent.update(confirmMessage = persistent.confirmMessage.copy(channelEndpoint = channelEndpoint)))
def incrementRedeliveryCount =
copy(persistent.update(redeliveries = persistent.redeliveries + 1))
}
case class Redeliver(attempts: DeliveryAttempts, redeliveryMax: Int)
}
/**
* Resolves `destination` before sending `persistent` message to the resolved destination using
* the specified sender (`sdr`) as message sender.
* Redelivery process used by [[ReliableDelivery]].
*/
private class ResolvedDestinationDelivery(persistent: PersistentRepr, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery {
val path = destination.path
def onResolveSuccess(ref: ActorRef) = ref tell (persistent.update(resolved = true), sdr)
def onResolveFailure() = destination tell (persistent, sdr)
private class Redelivery(channelSettings: ChannelSettings) extends Actor {
import context.dispatcher
import channelSettings._
import ReliableDelivery._
private var attempts: DeliveryAttempts = Map.empty
private var schedule: Cancellable = _
def receive = {
case Redeliver(as, max)
attempts ++= as.map { case (k, a) (k, a.withChannelEndpoint(self)) }
attempts = attempts.foldLeft[DeliveryAttempts](Map.empty) {
case (acc, (k, attempt))
// drop redelivery attempts that exceed redeliveryMax
if (attempt.persistent.redeliveries >= redeliverMax) acc
// increase redelivery count of attempt
else acc + (k -> attempt.incrementRedeliveryCount)
}
redeliver(attempts)
scheduleRedelivery()
case c @ Confirm(processorId, messageSequenceNr, _, _, _)
attempts -= ((processorId, messageSequenceNr))
}
/**
* Resolves `sdr` before sending `persistent` message to specified `destination` using
* the resolved sender as message sender.
*/
private class ResolvedSenderDelivery(persistent: PersistentRepr, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery {
val path = sdr.path
def onResolveSuccess(ref: ActorRef) = destination tell (persistent.update(resolved = true), ref)
def onResolveFailure() = destination tell (persistent, sdr)
override def preStart(): Unit =
scheduleRedelivery()
override def postStop(): Unit =
schedule.cancel()
private def scheduleRedelivery(): Unit =
schedule = context.system.scheduler.scheduleOnce(redeliverInterval, context.parent, Redeliver)
private def redeliver(attempts: DeliveryAttempts): Unit =
attempts.values.toSeq.sortBy(_.deliverySequenceNr).foreach(ad ad.destination tell (ad.persistent, ad.sender))
}
/**
* [[Channel]] specialization used by [[PersistentChannel]] to deliver stored messages.
*/
private class NoPrepChannel(channelId: String) extends Channel(Some(channelId)) {
override private[akka] def prepareDelivery(persistent: PersistentRepr) = persistent
}

View file

@ -8,6 +8,8 @@ import scala.collection.immutable
import akka.actor._
import akka.persistence.serialization.Message
/**
* INTERNAL API.
*
@ -22,6 +24,24 @@ private[persistence] object JournalProtocol {
*/
case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean)
/**
* Message sent after confirming the receipt of a [[ConfirmablePersistent]] message.
*
* @param processorId id of the processor that sent the message corresponding to
* this confirmation to a channel.
* @param messageSequenceNr sequence number of the sent message.
* @param channelId id of the channel that delivered the message corresponding to
* this confirmation.
* @param wrapperSequenceNr sequence number of the message stored by a persistent
* channel. This message contains the [[Deliver]] request
* with the message identified by `processorId` and
* `messageSequenceNumber`.
* @param channelEndpoint actor reference that sent the the message corresponding to
* this confirmation. This is a child actor of the sending
* [[Channel]] or [[PersistentChannel]].
*/
case class Confirm(processorId: String, messageSequenceNr: Long, channelId: String, wrapperSequenceNr: Long = 0L, channelEndpoint: ActorRef = null) extends Message
/**
* Instructs a journal to persist a sequence of messages.
*

View file

@ -12,6 +12,7 @@ import scala.collection.immutable
import akka.actor.{ ActorContext, ActorRef }
import akka.japi.Util.immutableSeq
import akka.pattern.PromiseActorRef
import akka.persistence.JournalProtocol.Confirm
import akka.persistence.serialization.Message
/**
@ -85,14 +86,20 @@ sealed abstract class ConfirmablePersistent extends Persistent {
* persistent message.
*/
def confirm(): Unit
/**
* Number of redeliveries. Only greater than zero if message has been redelivered by a [[Channel]]
* or [[PersistentChannel]].
*/
def redeliveries: Int
}
object ConfirmablePersistent {
/**
* [[ConfirmablePersistent]] extractor.
*/
def unapply(persistent: ConfirmablePersistent): Option[(Any, Long)] =
Some((persistent.payload, persistent.sequenceNr))
def unapply(persistent: ConfirmablePersistent): Option[(Any, Long, Int)] =
Some((persistent.payload, persistent.sequenceNr, persistent.redeliveries))
}
/**
@ -145,6 +152,12 @@ trait PersistentRepr extends Persistent with Message {
*/
def resolved: Boolean
/**
* Number of redeliveries. Only greater than zero if message has been redelivered by a [[Channel]]
* or [[PersistentChannel]].
*/
def redeliveries: Int
/**
* Channel ids of delivery confirmations that are available for this message. Only non-empty
* for replayed messages.
@ -196,6 +209,7 @@ trait PersistentRepr extends Persistent with Message {
processorId: String = processorId,
deleted: Boolean = deleted,
resolved: Boolean = resolved,
redeliveries: Int = redeliveries,
confirms: immutable.Seq[String] = confirms,
confirmMessage: Confirm = confirmMessage,
confirmTarget: ActorRef = confirmTarget,
@ -217,12 +231,13 @@ object PersistentRepr {
processorId: String = PersistentRepr.Undefined,
deleted: Boolean = false,
resolved: Boolean = true,
redeliveries: Int = 0,
confirms: immutable.Seq[String] = Nil,
confirmable: Boolean = false,
confirmMessage: Confirm = null,
confirmTarget: ActorRef = null,
sender: ActorRef = null) =
if (confirmable) ConfirmablePersistentImpl(payload, sequenceNr, processorId, deleted, resolved, confirms, confirmMessage, confirmTarget, sender)
if (confirmable) ConfirmablePersistentImpl(payload, sequenceNr, processorId, deleted, resolved, redeliveries, confirms, confirmMessage, confirmTarget, sender)
else PersistentImpl(payload, sequenceNr, processorId, deleted, confirms, sender)
/**
@ -261,6 +276,7 @@ private[persistence] case class PersistentImpl(
processorId: String,
deleted: Boolean,
resolved: Boolean,
redeliveries: Int,
confirms: immutable.Seq[String],
confirmMessage: Confirm,
confirmTarget: ActorRef,
@ -268,6 +284,7 @@ private[persistence] case class PersistentImpl(
copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, confirms = confirms, sender = sender)
val resolved: Boolean = false
val redeliveries: Int = 0
val confirmable: Boolean = false
val confirmMessage: Confirm = null
val confirmTarget: ActorRef = null
@ -282,12 +299,13 @@ private[persistence] case class ConfirmablePersistentImpl(
processorId: String,
deleted: Boolean,
resolved: Boolean,
redeliveries: Int,
confirms: immutable.Seq[String],
confirmMessage: Confirm,
confirmTarget: ActorRef,
sender: ActorRef) extends ConfirmablePersistent with PersistentRepr {
def withPayload(payload: Any): Persistent =
def withPayload(payload: Any): ConfirmablePersistent =
copy(payload = payload)
def confirm(): Unit =
@ -298,21 +316,14 @@ private[persistence] case class ConfirmablePersistentImpl(
def prepareWrite(sender: ActorRef) =
copy(sender = sender, resolved = false, confirmMessage = null, confirmTarget = null)
def update(sequenceNr: Long, processorId: String, deleted: Boolean, resolved: Boolean, confirms: immutable.Seq[String], confirmMessage: Confirm, confirmTarget: ActorRef, sender: ActorRef) =
copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, resolved = resolved, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender)
def update(sequenceNr: Long, processorId: String, deleted: Boolean, resolved: Boolean, redeliveries: Int, confirms: immutable.Seq[String], confirmMessage: Confirm, confirmTarget: ActorRef, sender: ActorRef) =
copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, resolved = resolved, redeliveries = redeliveries, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender)
}
/**
* INTERNAL API.
*/
private[persistence] object ConfirmablePersistentImpl {
def apply(persistent: PersistentRepr, confirmMessage: Confirm, confirmTarget: ActorRef): ConfirmablePersistentImpl =
ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.processorId, persistent.deleted, persistent.resolved, persistent.confirms, confirmMessage, confirmTarget, persistent.sender)
def apply(persistent: PersistentRepr, confirmMessage: Confirm, confirmTarget: ActorRef = null): ConfirmablePersistentImpl =
ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.processorId, persistent.deleted, persistent.resolved, persistent.redeliveries, persistent.confirms, confirmMessage, confirmTarget, persistent.sender)
}
/**
* INTERNAL API.
*
* Message to confirm the receipt of a [[ConfirmablePersistent]] message.
*/
private[persistence] case class Confirm(processorId: String, sequenceNr: Long, channelId: String) extends Message

View file

@ -0,0 +1,231 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.language.postfixOps
import akka.AkkaException
import akka.actor._
import akka.persistence.JournalProtocol.Confirm
/**
* A [[PersistentChannel]] configuration object.
*
* @param redeliverMax maximum number of redeliveries (default is 5).
* @param redeliverInterval interval between redeliveries (default is 5 seconds).
* @param replyPersistent if `true` the sender will receive the successfully stored [[Persistent]]
* message that has been submitted with a [[Deliver]] request, or a
* [[PersistenceFailure]] message in case of a persistence failure.
*/
class PersistentChannelSettings(
redeliverMax: Int,
redeliverInterval: FiniteDuration,
val replyPersistent: Boolean) extends ChannelSettings(redeliverMax, redeliverInterval) {
/**
* Java API.
*/
override def withRedeliverMax(redeliverMax: Int): PersistentChannelSettings =
updatePersistent(redeliverMax = redeliverMax)
/**
* Java API.
*/
override def withRedeliverInterval(redeliverInterval: FiniteDuration): PersistentChannelSettings =
updatePersistent(redeliverInterval = redeliverInterval)
/**
* Java API.
*/
def withReplyPersistent(replayPersistent: Boolean) =
updatePersistent(replyPersistent = replyPersistent)
private def updatePersistent( // compile error if method name is 'update'
redeliverMax: Int = redeliverMax,
redeliverInterval: FiniteDuration = redeliverInterval,
replyPersistent: Boolean = replyPersistent): PersistentChannelSettings =
new PersistentChannelSettings(redeliverMax, redeliverInterval, replyPersistent)
}
object PersistentChannelSettings {
def apply(
redeliverMax: Int = 5,
redeliverInterval: FiniteDuration = 5 seconds,
replyPersistent: Boolean = false): PersistentChannelSettings =
new PersistentChannelSettings(redeliverMax, redeliverInterval, replyPersistent)
/**
* Java API.
*/
def create() = apply()
}
/**
* A [[PersistentChannel]] implements the same functionality as a [[Channel]] but additionally
* persists messages before they are delivered. This is done by using internally a special-purpose
* [[Processor]]. Therefore, the main use case of a persistent channel is standalone usage i.e.
* independent of an application-specific [[Processor]] sending messages to a channel. Messages
* that have been persisted by a persistent channel are deleted when destinations confirm the
* receipt of these messages.
*
* Using a persistent channel in combination with a [[Processor]] can make sense if destinations
* are unavailable for a long time and an application doesn't want to buffer all messages in
* memory (but write them to the journal instead). In this case, delivery can be disabled with
* [[DisableDelivery]] (to stop delivery and persist-only) and re-enabled with [[EnableDelivery]].
* `EnableDelivery` replays persistent messages to this channel and the channel delivers all
* unconfirmed messages again (which may then show up as duplicates at destinations as described
* in the API docs of [[Channel]]. Duplicates can be detected by tracking message sequence numbers
* and redelivery counters).
*
* A persistent channel can also reply to [[Deliver]] senders whether persisting a message was
* successful or not (see `replyPersistent` of [[PersistentChannelSettings]]). If enabled, the
* sender will receive the persisted message as reply (i.e. a [[Persistent]] message), otherwise
* a [[PersistenceFailure]] message.
*/
final class PersistentChannel private[akka] (_channelId: Option[String], channelSettings: PersistentChannelSettings) extends Actor {
private val id = _channelId match {
case Some(cid) cid
case None Persistence(context.system).channelId(self)
}
private val reliableDelivery = context.actorOf(Props(classOf[ReliableDelivery], channelSettings))
private val resolvedDelivery = context.actorOf(Props(classOf[ResolvedDelivery], reliableDelivery))
private val reliableStorage = context.actorOf(Props(classOf[ReliableStorage], id, channelSettings, resolvedDelivery))
def receive = {
case d @ Deliver(persistent: PersistentRepr, destination, resolve)
// Persist the Deliver request by sending reliableStorage a Persistent message
// with the Deliver request as payload. This persistent message is referred to
// as the wrapper message, whereas the persistent message contained in the Deliver
// request is referred to as wrapped message (see also class ReliableStorage).
if (!persistent.confirms.contains(id)) reliableStorage forward Persistent(d)
case DisableDelivery reliableStorage ! DisableDelivery
case EnableDelivery reliableStorage ! EnableDelivery
}
}
object PersistentChannel {
/**
* Returns a channel actor configuration object for creating a [[PersistentChannel]] with a
* generated id and default [[PersistentChannelSettings]].
*/
def props(): Props = props(PersistentChannelSettings())
/**
* Returns a channel actor configuration object for creating a [[PersistentChannel]] with a
* generated id and specified `channelSettings`.
*
* @param channelSettings channel configuration object.
*/
def props(channelSettings: PersistentChannelSettings): Props =
Props(classOf[PersistentChannel], None, channelSettings)
/**
* Returns a channel actor configuration object for creating a [[PersistentChannel]] with the
* specified id and default [[PersistentChannelSettings]].
*
* @param channelId channel id.
*/
def props(channelId: String): Props =
props(channelId, PersistentChannelSettings())
/**
* Returns a channel actor configuration object for creating a [[PersistentChannel]] with the
* specified id and specified `channelSettings`.
*
* @param channelId channel id.
* @param channelSettings channel configuration object.
*/
def props(channelId: String, channelSettings: PersistentChannelSettings): Props =
Props(classOf[PersistentChannel], Some(channelId), channelSettings)
}
/**
* Instructs a [[PersistentChannel]] to disable the delivery of [[Persistent]] messages to their destination.
* The persistent channel, however, continues to persist messages (for later delivery).
*
* @see [[EnableDelivery]]
*/
@SerialVersionUID(1L)
case object DisableDelivery {
/**
* Java API.
*/
def getInstance = this
}
/**
* Instructs a [[PersistentChannel]] to re-enable the delivery of [[Persistent]] messages to their destination.
* This will first deliver all messages that have been stored by a persistent channel for which no confirmation
* is available yet. New [[Deliver]] requests are processed after all stored messages have been delivered. This
* request only has an effect if a persistent channel has previously been disabled with [[DisableDelivery]].
*
* @see [[DisableDelivery]]
*/
@SerialVersionUID(1L)
case object EnableDelivery {
/**
* Java API.
*/
def getInstance = this
}
/**
* Thrown by a persistent channel when [[EnableDelivery]] has been requested and delivery has been previously
* disabled for that channel.
*/
@SerialVersionUID(1L)
class ChannelRestartRequiredException extends AkkaException("channel restart required for enabling delivery")
private class ReliableStorage(channelId: String, channelSettings: PersistentChannelSettings, next: ActorRef) extends Processor {
import channelSettings._
override val processorId = channelId
private val journal = Persistence(context.system).journalFor(channelId)
private var deliveryEnabled = true
def receive = {
case p @ Persistent(d @ Deliver(wrapped: PersistentRepr, destination, resolve), snr)
val wrapper = p.asInstanceOf[PersistentRepr]
val prepared = prepareDelivery(wrapped, wrapper)
if (!recoveryRunning && wrapped.processorId != PersistentRepr.Undefined)
// Write a delivery confirmation to the journal so that replayed Deliver
// requests from a sending processor are not persisted again. Replaying
// Deliver requests is now the responsibility of this processor.
journal ! Confirm(prepared.processorId, prepared.sequenceNr, channelId)
if (!recoveryRunning && replyPersistent)
sender ! prepared
if (deliveryEnabled)
next forward d.copy(prepared)
case p: PersistenceFailure if (replyPersistent) sender ! p
case EnableDelivery if (!deliveryEnabled) throw new ChannelRestartRequiredException
case DisableDelivery deliveryEnabled = false
}
/**
* @param wrapped persistent message contained in a deliver request
* @param wrapper persistent message that contains a deliver request
*/
private def prepareDelivery(wrapped: PersistentRepr, wrapper: PersistentRepr): PersistentRepr = {
// use the sequence number of the wrapper message if the channel is used standalone,
// otherwise, use sequence number of the wrapped message (that has been generated by
// the sending processor).
val sequenceNr = if (wrapped.sequenceNr == 0L) wrapper.sequenceNr else wrapped.sequenceNr
val resolved = wrapped.resolved && wrapper.asInstanceOf[PersistentRepr].resolved
val updated = wrapped.update(sequenceNr = sequenceNr, resolved = resolved)
// include the wrapper sequence number in the Confirm message so that the wrapper can
// be deleted later when the confirmation arrives.
ConfirmablePersistentImpl(updated,
confirmTarget = journal,
confirmMessage = Confirm(updated.processorId, sequenceNr, channelId, wrapper.sequenceNr))
}
}

View file

@ -10,7 +10,7 @@ import scala.concurrent.Future
import scala.util._
import akka.actor._
import akka.pattern.{ pipe, PromiseActorRef }
import akka.pattern.pipe
import akka.persistence._
import akka.persistence.JournalProtocol._
@ -51,11 +51,22 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
} recover {
case e ReplayFailure(e)
} pipeTo (processor)
case c @ Confirm(processorId, sequenceNr, channelId)
confirmAsync(processorId, sequenceNr, channelId) onComplete {
case Success(_) if (extension.publishPluginCommands) context.system.eventStream.publish(c)
case c @ Confirm(processorId, messageSequenceNr, channelId, wrapperSequenceNr, channelEndpoint)
val op = if (wrapperSequenceNr == 0L) {
// A wrapperSequenceNr == 0L means that the corresponding message was delivered by a
// transient channel. We can now write a delivery confirmation for this message.
confirmAsync(processorId, messageSequenceNr, channelId)
} else {
// A wrapperSequenceNr != 0L means that the corresponding message was delivered by a
// persistent channel. We can now safely delete the wrapper message (that contains the
// delivered message).
deleteAsync(channelId, wrapperSequenceNr, wrapperSequenceNr, true)
}
op onComplete {
case Success(_)
if (extension.publishPluginCommands) context.system.eventStream.publish(c)
if (channelEndpoint != null) channelEndpoint ! c
case Failure(e) // TODO: publish failure to event stream
context.system.eventStream.publish(c)
}
case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent)
deleteAsync(processorId, fromSequenceNr, toSequenceNr, permanent) onComplete {

View file

@ -9,7 +9,7 @@ import scala.collection.immutable
import scala.util._
import akka.actor.Actor
import akka.pattern.{ pipe, PromiseActorRef }
import akka.pattern.pipe
import akka.persistence._
/**
@ -40,8 +40,18 @@ trait SyncWriteJournal extends Actor with AsyncReplay {
} recover {
case e ReplayFailure(e)
} pipeTo (processor)
case c @ Confirm(processorId, sequenceNr, channelId)
confirm(processorId, sequenceNr, channelId)
case c @ Confirm(processorId, messageSequenceNr, channelId, wrapperSequenceNr, channelEndpoint)
if (wrapperSequenceNr == 0L) {
// A wrapperSequenceNr == 0L means that the corresponding message was delivered by a
// transient channel. We can now write a delivery confirmation for this message.
confirm(processorId, messageSequenceNr, channelId)
} else {
// A wrapperSequenceNr != 0L means that the corresponding message was delivered by a
// persistent channel. We can now safely delete the wrapper message (that contains the
// delivered message).
delete(channelId, wrapperSequenceNr, wrapperSequenceNr, true)
}
if (channelEndpoint != null) channelEndpoint ! c
if (extension.publishPluginCommands) context.system.eventStream.publish(c)
case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent)
delete(processorId, fromSequenceNr, toSequenceNr, permanent)

View file

@ -11,6 +11,7 @@ import com.google.protobuf._
import akka.actor.ExtendedActorSystem
import akka.japi.Util.immutableSeq
import akka.persistence._
import akka.persistence.JournalProtocol.Confirm
import akka.persistence.serialization.MessageFormats._
import akka.persistence.serialization.MessageFormats.DeliverMessage.ResolveStrategy
import akka.serialization._
@ -100,6 +101,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
builder.setSequenceNr(persistent.sequenceNr)
builder.setDeleted(persistent.deleted)
builder.setResolved(persistent.resolved)
builder.setRedeliveries(persistent.redeliveries)
builder.setConfirmable(persistent.confirmable)
builder
}
@ -116,10 +118,15 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
}
private def confirmMessageBuilder(confirm: Confirm) = {
ConfirmMessage.newBuilder
.setProcessorId(confirm.processorId)
.setSequenceNr(confirm.sequenceNr)
.setChannelId(confirm.channelId)
val builder = ConfirmMessage.newBuilder
if (confirm.channelEndpoint != null) builder.setChannelEndpoint(Serialization.serializedActorPath(confirm.channelEndpoint))
builder.setProcessorId(confirm.processorId)
builder.setMessageSequenceNr(confirm.messageSequenceNr)
builder.setChannelId(confirm.channelId)
builder.setWrapperSequenceNr(confirm.wrapperSequenceNr)
builder
}
//
@ -147,6 +154,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
if (persistentMessage.hasProcessorId) persistentMessage.getProcessorId else Undefined,
persistentMessage.getDeleted,
persistentMessage.getResolved,
persistentMessage.getRedeliveries,
immutableSeq(persistentMessage.getConfirmsList),
persistentMessage.getConfirmable,
if (persistentMessage.hasConfirmMessage) confirm(persistentMessage.getConfirmMessage) else null,
@ -167,7 +175,9 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
private def confirm(confirmMessage: ConfirmMessage): Confirm = {
Confirm(
confirmMessage.getProcessorId,
confirmMessage.getSequenceNr,
confirmMessage.getChannelId)
confirmMessage.getMessageSequenceNr,
confirmMessage.getChannelId,
confirmMessage.getWrapperSequenceNr,
if (confirmMessage.hasChannelEndpoint) system.provider.resolveActorRef(confirmMessage.getChannelEndpoint) else null)
}
}

View file

@ -4,44 +4,36 @@
package akka.persistence
import scala.concurrent.duration._
import scala.language.postfixOps
import com.typesafe.config._
import akka.actor._
import akka.testkit._
import akka.persistence.JournalProtocol.Confirm
object ChannelSpec {
class TestProcessor(name: String, channelProps: Props) extends NamedProcessor(name) {
val destination = context.actorOf(Props[TestDestination])
val channel = context.actorOf(channelProps)
def receive = {
case m @ Persistent(s: String, _) if s.startsWith("a")
// forward to destination via channel,
// destination replies to initial sender
channel forward Deliver(m.withPayload(s"fw: ${s}"), destination)
case m @ Persistent(s: String, _) if s.startsWith("b")
// reply to sender via channel
channel ! Deliver(m.withPayload(s"re: ${s}"), sender)
}
}
class TestDestination extends Actor {
def receive = {
case m: Persistent sender ! m
}
}
class TestReceiver(testActor: ActorRef) extends Actor {
def receive = {
case Persistent(payload, _) testActor ! payload
case m: ConfirmablePersistent sender ! m
}
}
class TestDestinationProcessor(name: String) extends NamedProcessor(name) {
def receive = {
case cp @ ConfirmablePersistent("a", _) cp.confirm()
case cp @ ConfirmablePersistent("b", _) cp.confirm()
case cp @ ConfirmablePersistent("boom", _) if (recoveryFinished) throw new TestException("boom")
case cp @ ConfirmablePersistent("a", _, _) cp.confirm()
case cp @ ConfirmablePersistent("b", _, _) cp.confirm()
case cp @ ConfirmablePersistent("boom", _, _) if (recoveryFinished) throw new TestException("boom")
}
}
class TestReceiver(testActor: ActorRef) extends Actor {
def receive = {
case cp @ ConfirmablePersistent(payload, _, _)
testActor ! payload
cp.confirm()
}
}
}
@ -49,32 +41,29 @@ object ChannelSpec {
abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ChannelSpec._
override protected def beforeEach() {
protected var defaultTestChannel: ActorRef = _
protected var redeliverTestChannel: ActorRef = _
override protected def beforeEach: Unit = {
super.beforeEach()
val confirmProbe = TestProbe()
val forwardProbe = TestProbe()
val replyProbe = TestProbe()
val processor = system.actorOf(Props(classOf[TestProcessor], name, channelProps(s"${name}-channel")))
subscribeToConfirmation(confirmProbe)
processor tell (Persistent("a1"), forwardProbe.ref)
processor tell (Persistent("b1"), replyProbe.ref)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a1", _) m.confirm() }
replyProbe.expectMsgPF() { case m @ ConfirmablePersistent("re: b1", _) m.confirm() }
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
defaultTestChannel = createDefaultTestChannel()
redeliverTestChannel = createRedeliverTestChannel()
}
def actorRefFor(topLevelName: String) =
extension.system.provider.resolveActorRef(RootActorPath(Address("akka", system.name)) / "user" / topLevelName)
override protected def afterEach(): Unit = {
system.stop(defaultTestChannel)
system.stop(redeliverTestChannel)
super.afterEach()
}
def channelProps(channelId: String): Props =
Channel.props(channelId)
def redeliverChannelSettings: ChannelSettings =
ChannelSettings(redeliverMax = 2, redeliverInterval = 100 milliseconds)
def createDefaultTestChannel(): ActorRef =
system.actorOf(Channel.props(name, ChannelSettings()))
def createRedeliverTestChannel(): ActorRef =
system.actorOf(Channel.props(name, redeliverChannelSettings))
def subscribeToConfirmation(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[Confirm])
@ -82,196 +71,117 @@ abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with Persist
def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[Confirm]
def actorRefFor(topLevelName: String) =
extension.system.provider.resolveActorRef(RootActorPath(Address("akka", system.name)) / "user" / topLevelName)
"A channel" must {
"forward new messages to destination" in {
val processor = system.actorOf(Props(classOf[TestProcessor], name, channelProps(s"${name}-channel")))
processor ! Persistent("a2")
expectMsgPF() { case m @ ConfirmablePersistent("fw: a2", _) m.confirm() }
}
"reply new messages to senders" in {
val processor = system.actorOf(Props(classOf[TestProcessor], name, channelProps(s"${name}-channel")))
processor ! Persistent("b2")
expectMsgPF() { case m @ ConfirmablePersistent("re: b2", _) m.confirm() }
}
"forward un-confirmed stored messages to destination during recovery" in {
val confirmProbe = TestProbe()
val forwardProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
val processor1 = system.actorOf(Props(classOf[TestProcessor], name, channelProps(s"${name}-channel")))
processor1 tell (Persistent("a1"), forwardProbe.ref)
processor1 tell (Persistent("a2"), forwardProbe.ref)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a1", _) /* no confirmation */ }
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a2", _) m.confirm() }
awaitConfirmation(confirmProbe)
val processor2 = system.actorOf(Props(classOf[TestProcessor], name, channelProps(s"${name}-channel")))
processor2 tell (Persistent("a3"), forwardProbe.ref)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a1", _) m.confirm() } // sender still valid, no need to resolve
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a3", _) m.confirm() }
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
}
"must resolve sender references and preserve message order" in {
val channel = system.actorOf(channelProps("channel-1"))
val destination = system.actorOf(Props[TestDestination])
val empty = actorRefFor("testSender") // will be an EmptyLocalActorRef
val sender = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender")
// replayed message (resolved = false) and invalid sender reference
channel tell (Deliver(PersistentRepr("a", resolved = false), destination, Resolve.Sender), empty)
defaultTestChannel tell (Deliver(PersistentRepr("a", resolved = false), destination, Resolve.Sender), empty)
// new messages (resolved = true) and valid sender references
channel tell (Deliver(Persistent("b"), destination), sender)
channel tell (Deliver(Persistent("c"), destination), sender)
defaultTestChannel tell (Deliver(Persistent("b"), destination), sender)
defaultTestChannel tell (Deliver(Persistent("c"), destination), sender)
expectMsg("a")
expectMsg("b")
expectMsg("c")
}
"must resolve destination references and preserve message order" in {
val channel = system.actorOf(channelProps("channel-2"))
val empty = actorRefFor("testDestination") // will be an EmptyLocalActorRef
val destination = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination")
// replayed message (resolved = false) and invalid destination reference
channel ! Deliver(PersistentRepr("a", resolved = false), empty, Resolve.Destination)
defaultTestChannel ! Deliver(PersistentRepr("a", resolved = false), empty, Resolve.Destination)
// new messages (resolved = true) and valid destination references
channel ! Deliver(Persistent("b"), destination)
channel ! Deliver(Persistent("c"), destination)
defaultTestChannel ! Deliver(Persistent("b"), destination)
defaultTestChannel ! Deliver(Persistent("c"), destination)
expectMsg("a")
expectMsg("b")
expectMsg("c")
}
"support processors as destination" in {
val channel = system.actorOf(channelProps(s"${name}-channel-new"))
val destination = system.actorOf(Props(classOf[TestDestinationProcessor], s"${name}-new"))
val destination = system.actorOf(Props(classOf[TestDestinationProcessor], name))
val confirmProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
channel ! Deliver(Persistent("a"), destination)
defaultTestChannel ! Deliver(Persistent("a"), destination)
awaitConfirmation(confirmProbe)
}
"support processors as destination that may fail" in {
val channel = system.actorOf(channelProps(s"${name}-channel-new"))
val destination = system.actorOf(Props(classOf[TestDestinationProcessor], s"${name}-new"))
val destination = system.actorOf(Props(classOf[TestDestinationProcessor], name))
val confirmProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
channel ! Deliver(Persistent("a"), destination)
channel ! Deliver(Persistent("boom"), destination)
channel ! Deliver(Persistent("b"), destination)
defaultTestChannel ! Deliver(Persistent("a"), destination)
defaultTestChannel ! Deliver(Persistent("boom"), destination)
defaultTestChannel ! Deliver(Persistent("b"), destination)
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
}
"accept confirmable persistent messages for delivery" in {
val channel = system.actorOf(channelProps(s"${name}-channel-new"))
val destination = system.actorOf(Props[TestDestination])
val confirmProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
channel ! Deliver(PersistentRepr("a", confirmable = true), destination)
defaultTestChannel ! Deliver(PersistentRepr("a", confirmable = true), destination)
expectMsgPF() { case m @ ConfirmablePersistent("a", _) m.confirm() }
expectMsgPF() { case m @ ConfirmablePersistent("a", _, _) m.confirm() }
awaitConfirmation(confirmProbe)
}
"redeliver on missing confirmation" in {
val probe = TestProbe()
redeliverTestChannel ! Deliver(Persistent("b"), probe.ref)
probe.expectMsgPF() { case m @ ConfirmablePersistent("b", _, redeliveries) redeliveries must be(0) }
probe.expectMsgPF() { case m @ ConfirmablePersistent("b", _, redeliveries) redeliveries must be(1) }
probe.expectMsgPF() { case m @ ConfirmablePersistent("b", _, redeliveries) redeliveries must be(2); m.confirm() }
}
"redeliver in correct relative order" in {
val deliveries = redeliverChannelSettings.redeliverMax + 1
val interval = redeliverChannelSettings.redeliverInterval.toMillis / 5 * 4
val probe = TestProbe()
val cycles = 9
1 to cycles foreach { i
redeliverTestChannel ! Deliver(Persistent(i), probe.ref)
Thread.sleep(interval)
}
abstract class PersistentChannelSpec(config: Config) extends ChannelSpec(config) {
override def channelProps(channelId: String): Props =
PersistentChannel.props(channelId)
override def subscribeToConfirmation(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[JournalProtocol.Delete])
override def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[JournalProtocol.Delete]
"A persistent channel" must {
"support disabling and re-enabling delivery" in {
val channel = system.actorOf(channelProps(s"${name}-channel"))
val confirmProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
channel ! Deliver(Persistent("a"), testActor)
expectMsgPF() { case m @ ConfirmablePersistent("a", _) m.confirm() }
awaitConfirmation(confirmProbe)
channel ! DisableDelivery
channel ! Deliver(Persistent("b"), testActor)
channel ! EnableDelivery
channel ! Deliver(Persistent("c"), testActor)
expectMsgPF() { case m @ ConfirmablePersistent("b", _) m.confirm() }
expectMsgPF() { case m @ ConfirmablePersistent("c", _) m.confirm() }
}
"support Persistent replies to Deliver senders" in {
val channel = system.actorOf(PersistentChannel.props(s"${name}-channel-new", true))
channel ! Deliver(Persistent("a"), system.deadLetters)
expectMsgPF() { case Persistent("a", 1) }
channel ! Deliver(PersistentRepr("b", sequenceNr = 13), system.deadLetters)
expectMsgPF() { case Persistent("b", 13) }
}
"must not modify certain persistent message field" in {
val channel = system.actorOf(channelProps(s"${name}-channel-new"))
val persistent1 = PersistentRepr(payload = "a", processorId = "p1", confirms = List("c1", "c2"), sender = channel, sequenceNr = 13)
val persistent2 = PersistentRepr(payload = "b", processorId = "p1", confirms = List("c1", "c2"), sender = channel)
channel ! Deliver(persistent1, testActor)
channel ! Deliver(persistent2, testActor)
expectMsgPF() { case ConfirmablePersistentImpl("a", 13, "p1", _, _, Seq("c1", "c2"), _, _, channel) }
expectMsgPF() { case ConfirmablePersistentImpl("b", 2, "p1", _, _, Seq("c1", "c2"), _, _, channel) }
}
val received = (1 to (cycles * deliveries)).foldLeft(Vector.empty[ConfirmablePersistent]) {
case (acc, _) acc :+ probe.expectMsgType[ConfirmablePersistent]
}
"A persistent channel" when {
"used standalone" must {
"redeliver un-confirmed stored messages during recovery" in {
val confirmProbe = TestProbe()
val forwardProbe = TestProbe()
val grouped = received.groupBy(_.redeliveries)
val expected = 1 to 9 toVector
subscribeToConfirmation(confirmProbe)
val channel1 = system.actorOf(channelProps(s"${name}-channel"))
channel1 tell (Deliver(Persistent("a1"), forwardProbe.ref), null)
channel1 tell (Deliver(Persistent("a2"), forwardProbe.ref), null)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a1", _) /* no confirmation */ }
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a2", _) m.confirm() }
awaitConfirmation(confirmProbe)
val channel2 = system.actorOf(channelProps(s"${name}-channel"))
channel2 tell (Deliver(Persistent("a3"), forwardProbe.ref), null)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a1", _) m.confirm() } // sender still valid, no need to resolve
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a3", _) m.confirm() }
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
grouped(0).map(_.payload) must be(expected)
grouped(1).map(_.payload) must be(expected)
grouped(2).map(_.payload) must be(expected)
}
"redeliver not more than redeliverMax on missing confirmation" in {
val probe = TestProbe()
redeliverTestChannel ! Deliver(PersistentRepr("a"), probe.ref)
probe.expectMsgPF() { case m @ ConfirmablePersistent("a", _, redeliveries) redeliveries must be(0) }
probe.expectMsgPF() { case m @ ConfirmablePersistent("a", _, redeliveries) redeliveries must be(1) }
probe.expectMsgPF() { case m @ ConfirmablePersistent("a", _, redeliveries) redeliveries must be(2) }
probe.expectNoMsg(300 milliseconds)
}
}
}
@ -279,5 +189,3 @@ abstract class PersistentChannelSpec(config: Config) extends ChannelSpec(config)
class LeveldbChannelSpec extends ChannelSpec(PersistenceSpec.config("leveldb", "channel"))
class InmemChannelSpec extends ChannelSpec(PersistenceSpec.config("inmem", "channel"))
class LeveldbPersistentChannelSpec extends PersistentChannelSpec(PersistenceSpec.config("leveldb", "persistent-channel"))
class InmemPersistentChannelSpec extends PersistentChannelSpec(PersistenceSpec.config("inmem", "persistent-channel"))

View file

@ -18,6 +18,7 @@ object FailureSpec {
s"""
akka.persistence.processor.chaos.live-processing-failure-rate = 0.3
akka.persistence.processor.chaos.replay-processing-failure-rate = 0.1
akka.persistence.destination.chaos.confirm-failure-rate = 0.3
akka.persistence.journal.plugin = "akka.persistence.journal.chaos"
akka.persistence.journal.chaos.write-failure-rate = 0.3
akka.persistence.journal.chaos.delete-failure-rate = 0.3
@ -34,28 +35,42 @@ object FailureSpec {
case class ProcessingFailure(i: Int)
case class JournalingFailure(i: Int)
class ChaosProcessor extends Processor with ActorLogging {
trait ChaosSupport { this: Actor
def random = ThreadLocalRandom.current
var state = Vector.empty[Int]
def contains(i: Int): Boolean =
state.contains(i)
def add(i: Int): Unit = {
state :+= i
if (state.length == numMessages) sender ! Done(state)
}
def shouldFail(rate: Double) =
random.nextDouble() < rate
}
class ChaosProcessor(destination: ActorRef) extends Processor with ChaosSupport with ActorLogging {
val config = context.system.settings.config.getConfig("akka.persistence.processor.chaos")
val liveProcessingFailureRate = config.getDouble("live-processing-failure-rate")
val replayProcessingFailureRate = config.getDouble("replay-processing-failure-rate")
// processor state
var ints = Vector.empty[Int]
val channel = context.actorOf(Channel.props("channel", ChannelSettings(redeliverMax = 10, redeliverInterval = 500 milliseconds)), "channel")
override def processorId = "chaos"
def random = ThreadLocalRandom.current
def receive = {
case Persistent(i: Int, _)
case p @ Persistent(i: Int, _)
val failureRate = if (recoveryRunning) replayProcessingFailureRate else liveProcessingFailureRate
if (ints.contains(i)) {
if (contains(i)) {
log.debug(debugMessage(s"ignored duplicate ${i}"))
} else if (shouldFail(failureRate)) {
throw new TestException(debugMessage(s"rejected payload ${i}"))
} else {
ints :+= i
if (ints.length == numMessages) sender ! Done(ints)
add(i)
channel forward Deliver(p, destination)
log.debug(debugMessage(s"processed payload ${i}"))
}
case PersistenceFailure(i: Int, _, _)
@ -78,15 +93,34 @@ object FailureSpec {
super.preRestart(reason, message)
}
private def shouldFail(rate: Double) =
random.nextDouble() < rate
private def debugMessage(msg: String): String =
s"${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${ints.sorted})"
s"[processor] ${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${state.sorted})"
}
class ChaosDestination extends Actor with ChaosSupport with ActorLogging {
val config = context.system.settings.config.getConfig("akka.persistence.destination.chaos")
val confirmFailureRate = config.getDouble("confirm-failure-rate")
def receive = {
case cp @ ConfirmablePersistent(i: Int, _, _)
if (shouldFail(confirmFailureRate)) {
log.error(debugMessage("confirm message failed", cp))
} else if (contains(i)) {
log.debug(debugMessage("ignored duplicate", cp))
} else {
add(i)
cp.confirm()
log.debug(debugMessage("received and confirmed message", cp))
}
}
private def debugMessage(msg: String, cp: ConfirmablePersistent): String =
s"[destination] ${msg} (message = ConfirmablePersistent(${cp.payload}, ${cp.sequenceNr}, ${cp.redeliveries}), state = ${state.sorted})"
}
class ChaosProcessorApp(probe: ActorRef) extends Actor with ActorLogging {
val processor = context.actorOf(Props[ChaosProcessor])
val destination = context.actorOf(Props[ChaosDestination], "destination")
val processor = context.actorOf(Props(classOf[ChaosProcessor], destination), "processor")
def receive = {
case Start 1 to numMessages foreach (processor ! Persistent(_))
@ -107,10 +141,15 @@ class FailureSpec extends AkkaSpec(FailureSpec.config) with Cleanup with Implici
"The journaling protocol (= conversation between a processor and a journal)" must {
"tolerate and recover from random failures" in {
system.actorOf(Props(classOf[ChaosProcessorApp], testActor)) ! Start
expectMsgPF(numMessages seconds) { case Done(ints) ints.sorted must be(1 to numMessages toVector) }
expectDone() // by processor
expectDone() // by destination
system.actorOf(Props(classOf[ChaosProcessorApp], testActor)) // recovery of new instance must have same outcome
expectDone() // by processor
// destination doesn't receive messages again because all have been confirmed already
}
}
def expectDone() =
expectMsgPF(numMessages seconds) { case Done(ints) ints.sorted must be(1 to numMessages toVector) }
}
}
}

View file

@ -21,7 +21,7 @@ object PerformanceSpec {
case object StopMeasure
case class FailAt(sequenceNr: Long)
abstract class PerformanceTestProcessor(name: String) extends NamedProcessor(name) {
trait Measure extends { this: Actor
val NanoToSecond = 1000.0 * 1000 * 1000
var startTime: Long = 0L
@ -30,16 +30,43 @@ object PerformanceSpec {
var startSequenceNr = 0L;
var stopSequenceNr = 0L;
var failAt: Long = -1
val controlBehavior: Receive = {
case StartMeasure
def startMeasure(): Unit = {
startSequenceNr = lastSequenceNr
startTime = System.nanoTime
case StopMeasure
}
def stopMeasure(): Unit = {
stopSequenceNr = lastSequenceNr
stopTime = System.nanoTime
sender ! (NanoToSecond * (stopSequenceNr - startSequenceNr) / (stopTime - startTime))
}
def lastSequenceNr: Long
}
class PerformanceTestDestination extends Actor with Measure {
var lastSequenceNr = 0L
val confirm: PartialFunction[Any, Any] = {
case cp @ ConfirmablePersistent(payload, sequenceNr, _)
lastSequenceNr = sequenceNr
cp.confirm()
payload
}
def receive = confirm andThen {
case StartMeasure startMeasure()
case StopMeasure stopMeasure()
case m if (lastSequenceNr % 1000 == 0) print(".")
}
}
abstract class PerformanceTestProcessor(name: String) extends NamedProcessor(name) with Measure {
var failAt: Long = -1
val controlBehavior: Receive = {
case StartMeasure startMeasure()
case StopMeasure stopMeasure()
case FailAt(sequenceNr) failAt = sequenceNr
}
@ -136,6 +163,18 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "perfor
}
}
def stressPersistentChannel(): Unit = {
val channel = system.actorOf(PersistentChannel.props())
val destination = system.actorOf(Props[PerformanceTestDestination])
1 to warmupCycles foreach { i channel ! Deliver(Persistent(s"msg${i}"), destination) }
channel ! Deliver(Persistent(StartMeasure), destination)
1 to loadCycles foreach { i channel ! Deliver(Persistent(s"msg${i}"), destination) }
channel ! Deliver(Persistent(StopMeasure), destination)
expectMsgPF(100 seconds) {
case throughput: Double println(f"\nthroughput = $throughput%.2f persistent commands per second")
}
}
"A command sourced processor" should {
"have some reasonable throughput" in {
stressCommandsourcedProcessor(None)
@ -156,4 +195,10 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "perfor
stressStashingEventsourcedProcessor()
}
}
"A persistent channel" should {
"have some reasonable throughput" in {
stressPersistentChannel()
}
}
}

View file

@ -32,7 +32,7 @@ trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec
/**
* Prefix for generating a unique name per test.
*/
def namePrefix: String = "processor"
def namePrefix: String = "test"
/**
* Creates a processor with current name as constructor argument.
@ -41,7 +41,7 @@ trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec
system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name))
override protected def beforeEach() {
_name = namePrefix + counter.incrementAndGet()
_name = s"${namePrefix}-${counter.incrementAndGet()}"
}
}

View file

@ -0,0 +1,103 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.language.postfixOps
import com.typesafe.config._
import akka.actor._
import akka.testkit._
abstract class PersistentChannelSpec(config: Config) extends ChannelSpec(config) {
override def redeliverChannelSettings: PersistentChannelSettings =
PersistentChannelSettings(redeliverMax = 2, redeliverInterval = 100 milliseconds)
override def createDefaultTestChannel(): ActorRef =
system.actorOf(PersistentChannel.props(name, PersistentChannelSettings()))
override def createRedeliverTestChannel(): ActorRef =
system.actorOf(PersistentChannel.props(name, redeliverChannelSettings))
"A persistent channel" must {
"support disabling and re-enabling delivery" in {
val confirmProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
defaultTestChannel ! Deliver(Persistent("a"), testActor)
expectMsgPF() { case m @ ConfirmablePersistent("a", _, _) m.confirm() }
awaitConfirmation(confirmProbe)
defaultTestChannel ! DisableDelivery
defaultTestChannel ! Deliver(Persistent("b"), testActor)
defaultTestChannel ! EnableDelivery
defaultTestChannel ! Deliver(Persistent("c"), testActor)
expectMsgPF() { case m @ ConfirmablePersistent("b", _, _) m.confirm() }
expectMsgPF() { case m @ ConfirmablePersistent("c", _, _) m.confirm() }
}
"support Persistent replies to Deliver senders" in {
val channel1 = system.actorOf(PersistentChannel.props(s"${name}-with-reply", PersistentChannelSettings(replyPersistent = true)))
channel1 ! Deliver(Persistent("a"), system.deadLetters)
expectMsgPF() { case Persistent("a", 1) }
channel1 ! Deliver(PersistentRepr("b", sequenceNr = 13), system.deadLetters)
expectMsgPF() { case Persistent("b", 13) }
system.stop(channel1)
}
"must not modify certain persistent message field" in {
val persistent1 = PersistentRepr(payload = "a", processorId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel, sequenceNr = 13)
val persistent2 = PersistentRepr(payload = "b", processorId = "p1", confirms = List("c1", "c2"), sender = defaultTestChannel)
defaultTestChannel ! Deliver(persistent1, testActor)
defaultTestChannel ! Deliver(persistent2, testActor)
expectMsgPF() { case cp @ ConfirmablePersistentImpl("a", 13, "p1", _, _, _, Seq("c1", "c2"), _, _, channel) cp.confirm() }
expectMsgPF() { case cp @ ConfirmablePersistentImpl("b", 2, "p1", _, _, _, Seq("c1", "c2"), _, _, channel) cp.confirm() }
}
}
"A persistent channel" when {
"used standalone" must {
"redeliver un-confirmed stored messages during recovery" in {
val confirmProbe = TestProbe()
val forwardProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
val channel1 = createDefaultTestChannel()
channel1 tell (Deliver(Persistent("a1"), forwardProbe.ref), null)
channel1 tell (Deliver(Persistent("a2"), forwardProbe.ref), null)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a1", _, _) /* no confirmation */ }
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a2", _, _) m.confirm() }
awaitConfirmation(confirmProbe)
system.stop(channel1)
val channel2 = createDefaultTestChannel()
channel2 tell (Deliver(Persistent("a3"), forwardProbe.ref), null)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a1", _, _) m.confirm() } // sender still valid, no need to resolve
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a3", _, _) m.confirm() }
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
system.stop(channel2)
}
}
}
}
class LeveldbPersistentChannelSpec extends PersistentChannelSpec(PersistenceSpec.config("leveldb", "persistent-channel"))
class InmemPersistentChannelSpec extends PersistentChannelSpec(PersistenceSpec.config("inmem", "persistent-channel"))

View file

@ -0,0 +1,162 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.language.postfixOps
import com.typesafe.config._
import akka.actor._
import akka.testkit._
import akka.persistence.JournalProtocol.Confirm
object ProcessorChannelSpec {
class TestProcessor(name: String) extends NamedProcessor(name) {
val destination = context.actorOf(Props[TestDestination])
val channel = context.actorOf(Channel.props(s"${name}-channel"))
def receive = {
case m @ Persistent(s: String, _) if s.startsWith("a")
// forward to destination via channel,
// destination replies to initial sender
channel forward Deliver(m.withPayload(s"fw: ${s}"), destination)
case m @ Persistent(s: String, _) if s.startsWith("b")
// reply to sender via channel
channel ! Deliver(m.withPayload(s"re: ${s}"), sender)
}
}
class TestDestination extends Actor {
def receive = {
case m: Persistent sender ! m
}
}
class ResendingProcessor(name: String, destination: ActorRef) extends NamedProcessor(name) {
val channel = context.actorOf(Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds)))
def receive = {
case p: Persistent channel ! Deliver(p, destination)
case "replay" throw new TestException("replay requested")
}
}
class ResendingEventsourcedProcessor(name: String, destination: ActorRef) extends NamedProcessor(name) with EventsourcedProcessor {
val channel = context.actorOf(Channel.props("channel", ChannelSettings(redeliverMax = 1, redeliverInterval = 100 milliseconds)))
var events: List[String] = Nil
def handleEvent(event: String) = {
events = event :: events
channel ! Deliver(Persistent(event), destination)
}
def receiveReplay: Receive = {
case event: String handleEvent(event)
}
def receiveCommand: Receive = {
case "cmd" persist("evt")(handleEvent)
case "replay" throw new TestException("replay requested")
}
}
}
abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ProcessorChannelSpec._
private var processor: ActorRef = _
override protected def beforeEach: Unit = {
super.beforeEach()
setupTestProcessorData()
processor = createTestProcessor()
}
override protected def afterEach(): Unit = {
system.stop(processor)
super.afterEach()
}
def subscribeToConfirmation(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[Confirm])
def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[Confirm]
def createTestProcessor(): ActorRef =
system.actorOf(Props(classOf[TestProcessor], name))
def setupTestProcessorData(): Unit = {
val confirmProbe = TestProbe()
val forwardProbe = TestProbe()
val replyProbe = TestProbe()
val processor = createTestProcessor()
subscribeToConfirmation(confirmProbe)
processor tell (Persistent("a1"), forwardProbe.ref)
processor tell (Persistent("b1"), replyProbe.ref)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a1", _, _) m.confirm() }
replyProbe.expectMsgPF() { case m @ ConfirmablePersistent("re: b1", _, _) m.confirm() }
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
system.stop(processor)
}
"A processor that uses a channel" can {
"forward new messages to destination" in {
processor ! Persistent("a2")
expectMsgPF() { case m @ ConfirmablePersistent("fw: a2", _, _) m.confirm() }
}
"reply new messages to senders" in {
processor ! Persistent("b2")
expectMsgPF() { case m @ ConfirmablePersistent("re: b2", _, _) m.confirm() }
}
"resend unconfirmed messages on restart" in {
val probe = TestProbe()
val p = system.actorOf(Props(classOf[ResendingProcessor], "rp", probe.ref))
p ! Persistent("a")
probe.expectMsgPF() { case cp @ ConfirmablePersistent("a", 1L, 0) }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("a", 1L, 1) }
probe.expectNoMsg(200 milliseconds)
p ! "replay"
probe.expectMsgPF() { case cp @ ConfirmablePersistent("a", 1L, 0) }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("a", 1L, 1) cp.confirm() }
}
}
"An eventsourced processor that uses a channel" can {
"reliably deliver events" in {
val probe = TestProbe()
val ep = system.actorOf(Props(classOf[ResendingEventsourcedProcessor], "rep", probe.ref))
ep ! "cmd"
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 0) }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 1) }
probe.expectNoMsg(200 milliseconds)
ep ! "replay"
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 0) }
probe.expectMsgPF() { case cp @ ConfirmablePersistent("evt", 1L, 1) cp.confirm() }
}
}
}
class LeveldbProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("leveldb", "channel"))
class InmemProcessorChannelSpec extends ProcessorChannelSpec(PersistenceSpec.config("inmem", "channel"))

View file

@ -10,6 +10,7 @@ import com.typesafe.config._
import akka.actor._
import akka.persistence._
import akka.persistence.JournalProtocol.Confirm
import akka.serialization._
import akka.testkit._
@ -75,7 +76,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(config(customSerializers
"A message serializer" when {
"not given a manifest" must {
"handle custom ConfirmablePersistent message serialization" in {
val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true, true, List("c1", "c2"), confirmable = true, Confirm("p2", 14, "c2"), testActor, testActor)
val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true, true, 3, List("c1", "c2"), confirmable = true, Confirm("p2", 14, "c2"), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
@ -84,7 +85,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(config(customSerializers
deserialized must be(persistent.withPayload(MyPayload(".a.")))
}
"handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true, true, List("c1", "c2"), confirmable = false, Confirm("p2", 14, "c2"), testActor, testActor)
val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true, true, 0, List("c1", "c2"), confirmable = false, Confirm("p2", 14, "c2"), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
@ -95,7 +96,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(config(customSerializers
}
"given a PersistentRepr manifest" must {
"handle custom ConfirmablePersistent message serialization" in {
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, true, List("c1", "c2"), confirmable = true, Confirm("p2", 14, "c2"), testActor, testActor)
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, true, 3, List("c1", "c2"), confirmable = true, Confirm("p2", 14, "c2"), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
@ -104,7 +105,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(config(customSerializers
deserialized must be(persistent.withPayload(MyPayload(".b.")))
}
"handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, true, List("c1", "c2"), confirmable = true, Confirm("p2", 14, "c2"), testActor, testActor)
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, true, 3, List("c1", "c2"), confirmable = true, Confirm("p2", 14, "c2"), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
@ -137,9 +138,9 @@ object MessageSerializerRemotingSpec {
class RemoteActor extends Actor {
def receive = {
case PersistentBatch(Persistent(MyPayload(data), _) +: tail) sender ! s"b${data}"
case ConfirmablePersistent(MyPayload(data), _) sender ! s"c${data}"
case ConfirmablePersistent(MyPayload(data), _, _) sender ! s"c${data}"
case Persistent(MyPayload(data), _) sender ! s"p${data}"
case Confirm(pid, snr, cid) sender ! s"${pid},${snr},${cid}"
case p @ Confirm(pid, msnr, cid, wsnr, ep) sender ! s"${pid},${msnr},${cid},${wsnr},${ep.path.name.startsWith("testActor")}"
}
}
@ -176,8 +177,8 @@ class MessageSerializerRemotingSpec extends AkkaSpec(config(systemA).withFallbac
expectMsg("b.a.")
}
"serialize Confirm messages during remoting" in {
localActor ! Confirm("a", 2, "b")
expectMsg("a,2,b")
localActor ! Confirm("a", 2, "b", 3, testActor)
expectMsg("a,2,b,3,true")
}
}
}

View file

@ -16,7 +16,7 @@ object ConversationRecoveryExample extends App {
var counter = 0
def receive = {
case m @ ConfirmablePersistent(Ping, _)
case m @ ConfirmablePersistent(Ping, _, _)
counter += 1
println(s"received ping ${counter} times ...")
m.confirm()
@ -33,7 +33,7 @@ object ConversationRecoveryExample extends App {
var counter = 0
def receive = {
case m @ ConfirmablePersistent(Pong, _)
case m @ ConfirmablePersistent(Pong, _, _)
counter += 1
println(s"received pong ${counter} times ...")
m.confirm()

View file

@ -24,7 +24,7 @@ object ProcessorChannelExample extends App {
class ExampleDestination extends Actor {
def receive = {
case p @ ConfirmablePersistent(payload, snr)
case p @ ConfirmablePersistent(payload, snr, _)
println(s"received ${payload}")
sender ! s"re: ${payload} (${snr})"
p.confirm()