!per #3707 Channel enhancements

- Persistent channel
- ConfirmablePersistent message type delivered by channel
- Sender resolution performance improvements
   * unstash() instead of unstashAll()

These enhancements required the following changes

- Unified implementation of processor stash and user stash
- Persistence message plugin API separated from implementation
- Physical deletion of messages
This commit is contained in:
Martin Krasser 2013-11-07 10:45:02 +01:00
parent 8fb59a0bc6
commit ba9fc4da46
41 changed files with 2167 additions and 722 deletions

View file

@ -3,9 +3,10 @@
*/ */
package akka.actor package akka.actor
import akka.dispatch.{ UnboundedDequeBasedMessageQueueSemantics, RequiresMessageQueue, Envelope, DequeBasedMessageQueueSemantics } import scala.collection.immutable
import akka.AkkaException import akka.AkkaException
import akka.dispatch.Mailboxes import akka.dispatch.{ UnboundedDequeBasedMessageQueueSemantics, RequiresMessageQueue, Envelope, DequeBasedMessageQueueSemantics, Mailboxes }
/** /**
* The `Stash` trait enables an actor to temporarily stash away messages that can not or * The `Stash` trait enables an actor to temporarily stash away messages that can not or
@ -60,7 +61,59 @@ trait UnboundedStash extends UnrestrictedStash with RequiresMessageQueue[Unbound
* A version of [[akka.actor.Stash]] that does not enforce any mailbox type. The proper mailbox has to be configured * A version of [[akka.actor.Stash]] that does not enforce any mailbox type. The proper mailbox has to be configured
* manually, and the mailbox should extend the [[akka.dispatch.DequeBasedMessageQueueSemantics]] marker trait. * manually, and the mailbox should extend the [[akka.dispatch.DequeBasedMessageQueueSemantics]] marker trait.
*/ */
trait UnrestrictedStash extends Actor { trait UnrestrictedStash extends Actor with StashSupport {
/**
* Overridden callback. Prepends all messages in the stash to the mailbox,
* clears the stash, stops all children and invokes the postStop() callback.
*/
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
try unstashAll() finally super.preRestart(reason, message)
}
/**
* Overridden callback. Prepends all messages in the stash to the mailbox and clears the stash.
* Must be called when overriding this method, otherwise stashed messages won't be propagated to DeadLetters
* when actor stops.
*/
override def postStop(): Unit = try unstashAll() finally super.postStop()
}
/**
* INTERNAL API.
*
* A factory for creating stashes for an actor instance.
*
* @see [[StashSupport]]
*/
private[akka] trait StashFactory { this: Actor
private[akka] def createStash()(implicit ctx: ActorContext, ref: ActorRef): StashSupport = new StashSupport {
def context: ActorContext = ctx
def self: ActorRef = ref
}
}
/**
* INTERNAL API.
*
* Support trait for implementing a stash for an actor instance. A default stash per actor (= user stash)
* is maintained by [[UnrestrictedStash]] by extending this trait. Actors that explicitly need other stashes
* (optionally in addition to and isolated from the user stash) can create new stashes via [[StashFactory]].
*/
private[akka] trait StashSupport {
/**
* INTERNAL API.
*
* Context of the actor that uses this stash.
*/
private[akka] def context: ActorContext
/**
* INTERNAL API.
*
* Self reference of the actor that uses this stash.
*/
private[akka] def self: ActorRef
/* The private stash of the actor. It is only accessible using `stash()` and /* The private stash of the actor. It is only accessible using `stash()` and
* `unstashAll()`. * `unstashAll()`.
*/ */
@ -110,6 +163,29 @@ trait UnrestrictedStash extends Actor {
else throw new StashOverflowException("Couldn't enqueue message " + currMsg + " to stash of " + self) else throw new StashOverflowException("Couldn't enqueue message " + currMsg + " to stash of " + self)
} }
/**
* Prepends `others` to this stash.
*/
private[akka] def prepend(others: immutable.Seq[Envelope]): Unit =
others.reverseIterator.foreach(env theStash = env +: theStash)
/**
* Prepends the oldest message in the stash to the mailbox, and then removes that
* message from the stash.
*
* Messages from the stash are enqueued to the mailbox until the capacity of the
* mailbox (if any) has been reached. In case a bounded mailbox overflows, a
* `MessageQueueAppendFailedException` is thrown.
*
* The unstashed message is guaranteed to be removed from the stash regardless
* if the `unstash()` call successfully returns or throws an exception.
*/
private[akka] def unstash(): Unit = if (theStash.nonEmpty) try {
mailbox.enqueueFirst(self, theStash.head)
} finally {
theStash = theStash.tail
}
/** /**
* Prepends all messages in the stash to the mailbox, and then clears the stash. * Prepends all messages in the stash to the mailbox, and then clears the stash.
* *
@ -155,22 +231,6 @@ trait UnrestrictedStash extends Actor {
theStash = Vector.empty[Envelope] theStash = Vector.empty[Envelope]
stashed stashed
} }
/**
* Overridden callback. Prepends all messages in the stash to the mailbox,
* clears the stash, stops all children and invokes the postStop() callback.
*/
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
try unstashAll() finally super.preRestart(reason, message)
}
/**
* Overridden callback. Prepends all messages in the stash to the mailbox and clears the stash.
* Must be called when overriding this method, otherwise stashed messages won't be propagated to DeadLetters
* when actor stops.
*/
override def postStop(): Unit = try unstashAll() finally super.postStop()
} }
/** /**

View file

@ -142,8 +142,8 @@ public class PersistenceDocTest {
class MyDestination extends UntypedActor { class MyDestination extends UntypedActor {
public void onReceive(Object message) throws Exception { public void onReceive(Object message) throws Exception {
if (message instanceof Persistent) { if (message instanceof ConfirmablePersistent) {
Persistent p = (Persistent)message; ConfirmablePersistent p = (ConfirmablePersistent)message;
System.out.println("received " + p.payload()); System.out.println("received " + p.payload());
p.confirm(); p.confirm();
} }
@ -266,4 +266,19 @@ public class PersistenceDocTest {
} }
//#batch-write //#batch-write
}; };
static Object o7 = new Object() {
abstract class MyProcessor extends UntypedProcessor {
ActorRef destination;
public void foo() {
//#persistent-channel-example
final ActorRef channel = getContext().actorOf(PersistentChannel.props(),
"myPersistentChannel");
channel.tell(Deliver.create(Persistent.create("example"), destination), getSelf());
//#persistent-channel-example
}
}
};
} }

View file

@ -36,22 +36,22 @@ public class PersistencePluginDocTest {
class MyAsyncJournal extends AsyncWriteJournal { class MyAsyncJournal extends AsyncWriteJournal {
@Override @Override
public Future<Long> doReplayAsync(String processorId, long fromSequenceNr, long toSequenceNr, Procedure<PersistentImpl> replayCallback) { public Future<Long> doReplayAsync(String processorId, long fromSequenceNr, long toSequenceNr, Procedure<PersistentRepr> replayCallback) {
return null; return null;
} }
@Override @Override
public Future<Void> doWriteAsync(PersistentImpl persistent) { public Future<Void> doWriteAsync(PersistentRepr persistent) {
return null; return null;
} }
@Override @Override
public Future<Void> doWriteBatchAsync(Iterable<PersistentImpl> persistentBatch) { public Future<Void> doWriteBatchAsync(Iterable<PersistentRepr> persistentBatch) {
return null; return null;
} }
@Override @Override
public Future<Void> doDeleteAsync(PersistentImpl persistent) { public Future<Void> doDeleteAsync(String processorId, long sequenceNr, boolean physical) {
return null; return null;
} }

View file

@ -57,6 +57,9 @@ Architecture
* *Snapshot store*: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for * *Snapshot store*: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for
optimizing recovery times. The storage backend of a snapshot store is pluggable. optimizing recovery times. The storage backend of a snapshot store is pluggable.
* *Event sourcing*. Based on the building blocks described above, Akka persistence provides abstractions for the
development of event sourced applications (see section :ref:`event-sourcing-java`)
Configuration Configuration
============= =============
@ -115,7 +118,8 @@ If not overridden, ``preStart`` sends a ``Recover`` message to ``getSelf()``. Ap
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-custom .. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-start-custom
Automated recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation. Upper sequence number bounds can be used to recover a processor to past state instead of current state. Automated
recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-restart-disabled .. includecode:: code/docs/persistence/PersistenceDocTest.java#recover-on-restart-disabled
@ -132,10 +136,19 @@ Failure handling
^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^
A persistent message that caused an exception will be received again by a processor after restart. To prevent A persistent message that caused an exception will be received again by a processor after restart. To prevent
a replay of that message during recovery it can be marked as deleted. a replay of that message during recovery it can be deleted.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#deletion .. includecode:: code/docs/persistence/PersistenceDocTest.java#deletion
Message deletion
----------------
A processor can delete messages by calling the ``delete`` method with a ``Persistent`` message object or a
sequence number as argument. An optional ``physical`` parameter specifies whether the message shall be
physically deleted from the journal or only marked as deleted. In both cases, the message won't be replayed.
Later extensions to Akka persistence will allow to replay messages that have been marked as deleted which can
be useful for debugging purposes, for example.
Identifiers Identifiers
----------- -----------
@ -152,7 +165,7 @@ should override ``processorId``.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#processor-id-override .. includecode:: code/docs/persistence/PersistenceDocTest.java#processor-id-override
Later versions of the Akka persistence module will likely offer a possibility to migrate processor ids. Later versions of Akka persistence will likely offer a possibility to migrate processor ids.
Channels Channels
======== ========
@ -166,21 +179,57 @@ message is retained by a channel if its previous delivery has been confirmed by
A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver`` A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver``
request instructs a channel to send a ``Persistent`` message to a destination where the sender of the ``Deliver`` request instructs a channel to send a ``Persistent`` message to a destination where the sender of the ``Deliver``
request is forwarded to the destination. A processor may also reply to a message sender directly by using request is forwarded to the destination. A processor may also reply to a message sender directly by using
``getSender()`` as channel destination. ``getSender()`` as channel destination (not shown).
.. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-example-reply .. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-example-reply
Channel destinations confirm the delivery of a ``Persistent`` message by calling its ``confirm()`` method. This Persistent messages delivered by a channel are of type ``ConfirmablePersistent``. It extends ``Persistent`` and
(asynchronously) writes a confirmation entry to the journal. Replayed messages internally contain these confirmation adds a ``confirm()`` method. Channel destinations confirm the delivery of a ``ConfirmablePersistent`` message by
entries which allows a channel to decide if a message should be retained or not. calling ``confirm()``. This (asynchronously) writes a confirmation entry to the journal. Replayed messages
internally contain these confirmation entries which allows a channel to decide if a message should be retained or
not. ``ConfirmablePersistent`` messages can be used whereever ``Persistent`` messages are expected, which allows
processors to be used as channel destinations, for example.
Message re-delivery
-------------------
If an application crashes after a destination called ``confirm()`` but before the confirmation entry could have If an application crashes after a destination called ``confirm()`` but before the confirmation entry could have
been written to the journal then the unconfirmed message will be delivered again during next recovery and it is been written to the journal then the unconfirmed message will be re-delivered during next recovery of the sending
the destination's responsibility to detect the duplicate or simply process the message again if it's an idempotent processor. It is the destination's responsibility to detect the duplicate or simply process the message again if
receiver. Duplicates can be detected, for example, by tracking sequence numbers. it's an idempotent receiver. Duplicates can be detected, for example, by tracking sequence numbers.
Currently, channels do not store ``Deliver`` requests or retry delivery on network or destination failures. This Although a channel prevents message loss in case of sender (JVM) crashes it doesn't attempt re-deliveries if a
feature (*reliable channels*) will be available soon. destination is unavailable. To achieve reliable communication with a (remote) target, a channel destination may
want to use the :ref:`reliable-proxy` or add the message to a queue that is managed by a third party message
broker, for example. In latter case, the channel destination will first add the received message to the queue
and then call ``confirm()`` on the received ``ConfirmablePersistent`` message.
Persistent channels
-------------------
Channels created with ``Channel.props`` do not persist messages. This is not necessary because these (transient)
channels shall only be used in combination with a sending processor that takes care of message persistence.
However, if an application wants to use a channel standalone (without a sending processor), to prevent message
loss in case of a sender (JVM) crash, it should use a persistent channel which can be created with ``PersistentChannel.props``.
A persistent channel additionally persists messages before they are delivered. Persistence is achieved by an
internal processor that delegates delivery to a transient channel. A persistent channel, when used standalone,
can therefore provide the same message re-delivery semantics as a transient channel in combination with an
application-defined processor.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#persistent-channel-example
By default, a persistent channel doesn't reply whether a ``Persistent`` message, sent with ``Deliver``, has been
successfully persisted or not. This can be enabled by creating the channel with the ``persistentReply`` parameter
set to ``true``: ``PersistentChannel.props(true)``. With this setting, either the successfully persisted message
is replied to the sender or a ``PersistenceFailure``. In case of a persistence failure, the sender should re-send
the message.
Using a persistent channel in combination with an application-defined processor can make sense if destinations are
unavailable for a long time and an application doesn't want to buffer all messages in memory (but write them to the
journal instead). In this case, delivery can be disabled with ``DisableDelivery`` (to stop delivery and persist-only)
and re-enabled with ``EnableDelivery``. A disabled channel that receives ``EnableDelivery`` will restart itself and
re-deliver all persisted, unconfirmed messages before serving new ``Deliver`` requests.
Sender resolution Sender resolution
----------------- -----------------
@ -208,7 +257,8 @@ Identifiers
In the same way as :ref:`processors-java`, channels also have an identifier that defaults to a channel's path. A channel In the same way as :ref:`processors-java`, channels also have an identifier that defaults to a channel's path. A channel
identifier can therefore be customized by using a custom actor name at channel creation. As already mentioned, this identifier can therefore be customized by using a custom actor name at channel creation. As already mentioned, this
works well when using local actor references but may cause problems with remote actor references. In this case, an works well when using local actor references but may cause problems with remote actor references. In this case, an
application-defined channel id should be provided as argument to ``Channel.props(String)`` application-defined channel id should be provided as argument to ``Channel.props(String)`` or
``PersistentChannel.props(String)``.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-id-override .. includecode:: code/docs/persistence/PersistenceDocTest.java#channel-id-override
@ -234,8 +284,8 @@ Sequence number
--------------- ---------------
The sequence number of a ``Persistent`` message can be obtained via its ``sequenceNr`` method. Persistent The sequence number of a ``Persistent`` message can be obtained via its ``sequenceNr`` method. Persistent
messages are assigned sequence numbers on a per-processor basis. A sequence starts at ``1L`` and doesn't contain messages are assigned sequence numbers on a per-processor basis (or per persistent channel basis if used
gaps unless a processor marks a message as deleted. standalone). A sequence starts at ``1L`` and doesn't contain gaps unless a processor deletes a message.
.. _snapshots-java: .. _snapshots-java:
@ -331,8 +381,9 @@ Applications may also send a batch of ``Persistent`` messages to a processor via
received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes
can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example, can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example,
in :ref:`event-sourcing-java`, all events that are generated and persisted by a single command are batch-written to in :ref:`event-sourcing-java`, all events that are generated and persisted by a single command are batch-written to
the journal. The recovery of an ``UntypedEventsourcedProcessor`` will therefore never be done partially i.e. with the journal (even if ``persist`` is called multiple times per command). The recovery of an
only a subset of events persisted by a single command. ``UntypedEventsourcedProcessor`` will therefore never be done partially i.e. with only a subset of events persisted
by a single command.
Storage plugins Storage plugins
=============== ===============
@ -399,10 +450,3 @@ it must add
to the application configuration. If not specified, a default serializer is used, which is the ``JavaSerializer`` to the application configuration. If not specified, a default serializer is used, which is the ``JavaSerializer``
in this example. in this example.
Upcoming features
=================
* Reliable channels
* Extended deletion of messages and snapshots
* ...

View file

@ -117,7 +117,7 @@ trait PersistenceDocSpec {
class MyDestination extends Actor { class MyDestination extends Actor {
def receive = { def receive = {
case p @ Persistent(payload, _) { case p @ ConfirmablePersistent(payload, _) {
println(s"received ${payload}") println(s"received ${payload}")
p.confirm() p.confirm()
} }
@ -243,4 +243,17 @@ trait PersistenceDocSpec {
//#batch-write //#batch-write
system.shutdown() system.shutdown()
} }
new AnyRef {
import akka.actor._
trait MyActor extends Actor {
val destination: ActorRef = null
//#persistent-channel-example
val channel = context.actorOf(PersistentChannel.props(),
name = "myPersistentChannel")
channel ! Deliver(Persistent("example"), destination)
//#persistent-channel-example
}
}
} }

View file

@ -69,11 +69,11 @@ class PersistencePluginDocSpec extends WordSpec {
} }
class MyJournal extends AsyncWriteJournal { class MyJournal extends AsyncWriteJournal {
def writeAsync(persistent: PersistentImpl): Future[Unit] = ??? def writeAsync(persistent: PersistentRepr): Future[Unit] = ???
def writeBatchAsync(persistentBatch: Seq[PersistentImpl]): Future[Unit] = ??? def writeBatchAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ???
def deleteAsync(persistent: PersistentImpl): Future[Unit] = ??? def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit] = ???
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ??? def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ???
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) Unit): Future[Long] = ??? def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) Unit): Future[Long] = ???
} }
class MySnapshotStore extends SnapshotStore { class MySnapshotStore extends SnapshotStore {

View file

@ -53,6 +53,9 @@ Architecture
* *Snapshot store*: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for * *Snapshot store*: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for
optimizing recovery times. The storage backend of a snapshot store is pluggable. optimizing recovery times. The storage backend of a snapshot store is pluggable.
* *Event sourcing*. Based on the building blocks described above, Akka persistence provides abstractions for the
development of event sourced applications (see section :ref:`event-sourcing`)
Configuration Configuration
============= =============
@ -110,7 +113,8 @@ If not overridden, ``preStart`` sends a ``Recover()`` message to ``self``. Appli
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-custom .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-start-custom
Automated recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation. Upper sequence number bounds can be used to recover a processor to past state instead of current state. Automated
recovery on restart can be disabled by overriding ``preRestart`` with an empty implementation.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-restart-disabled .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#recover-on-restart-disabled
@ -127,10 +131,19 @@ Failure handling
^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^
A persistent message that caused an exception will be received again by a processor after restart. To prevent A persistent message that caused an exception will be received again by a processor after restart. To prevent
a replay of that message during recovery it can be marked as deleted. a replay of that message during recovery it can be deleted.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#deletion .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#deletion
Message deletion
----------------
A processor can delete messages by calling the ``delete`` method with a ``Persistent`` message object or a
sequence number as argument. An optional ``physical`` parameter specifies whether the message shall be
physically deleted from the journal or only marked as deleted. In both cases, the message won't be replayed.
Later extensions to Akka persistence will allow to replay messages that have been marked as deleted which can
be useful for debugging purposes, for example.
Identifiers Identifiers
----------- -----------
@ -147,7 +160,7 @@ should override ``processorId``.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#processor-id-override .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#processor-id-override
Later versions of the Akka persistence module will likely offer a possibility to migrate processor ids. Later versions of Akka persistence will likely offer a possibility to migrate processor ids.
Channels Channels
======== ========
@ -161,21 +174,57 @@ message is retained by a channel if its previous delivery has been confirmed by
A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver`` A channel is ready to use once it has been created, no recovery or further activation is needed. A ``Deliver``
request instructs a channel to send a ``Persistent`` message to a destination where the sender of the ``Deliver`` request instructs a channel to send a ``Persistent`` message to a destination where the sender of the ``Deliver``
request is forwarded to the destination. A processor may also reply to a message sender directly by using ``sender`` request is forwarded to the destination. A processor may also reply to a message sender directly by using ``sender``
as channel destination. as channel destination (not shown).
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-example-reply .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-example-reply
Channel destinations confirm the delivery of a ``Persistent`` message by calling its ``confirm()`` method. This Persistent messages delivered by a channel are of type ``ConfirmablePersistent``. It extends ``Persistent`` and
(asynchronously) writes a confirmation entry to the journal. Replayed messages internally contain these confirmation adds a ``confirm()`` method. Channel destinations confirm the delivery of a ``ConfirmablePersistent`` message by
entries which allows a channel to decide if a message should be retained or not. calling ``confirm()``. This (asynchronously) writes a confirmation entry to the journal. Replayed messages
internally contain these confirmation entries which allows a channel to decide if a message should be retained or
not. ``ConfirmablePersistent`` messages can be used whereever ``Persistent`` messages are expected, which allows
processors to be used as channel destinations, for example.
Message re-delivery
-------------------
If an application crashes after a destination called ``confirm()`` but before the confirmation entry could have If an application crashes after a destination called ``confirm()`` but before the confirmation entry could have
been written to the journal then the unconfirmed message will be delivered again during next recovery and it is been written to the journal then the unconfirmed message will be re-delivered during next recovery of the sending
the destination's responsibility to detect the duplicate or simply process the message again if it's an idempotent processor. It is the destination's responsibility to detect the duplicate or simply process the message again if
receiver. Duplicates can be detected, for example, by tracking sequence numbers. it's an idempotent receiver. Duplicates can be detected, for example, by tracking sequence numbers.
Currently, channels do not store ``Deliver`` requests or retry delivery on network or destination failures. This Although a channel prevents message loss in case of sender (JVM) crashes it doesn't attempt re-deliveries if a
feature (*reliable channels*) will be available soon. destination is unavailable. To achieve reliable communication with a (remote) target, a channel destination may
want to use the :ref:`reliable-proxy` or add the message to a queue that is managed by a third party message
broker, for example. In latter case, the channel destination will first add the received message to the queue
and then call ``confirm()`` on the received ``ConfirmablePersistent`` message.
Persistent channels
-------------------
Channels created with ``Channel.props`` do not persist messages. This is not necessary because these (transient)
channels shall only be used in combination with a sending processor that takes care of message persistence.
However, if an application wants to use a channel standalone (without a sending processor), to prevent message
loss in case of a sender (JVM) crash, it should use a persistent channel which can be created with ``PersistentChannel.props``.
A persistent channel additionally persists messages before they are delivered. Persistence is achieved by an
internal processor that delegates delivery to a transient channel. A persistent channel, when used standalone,
can therefore provide the same message re-delivery semantics as a transient channel in combination with an
application-defined processor.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#persistent-channel-example
By default, a persistent channel doesn't reply whether a ``Persistent`` message, sent with ``Deliver``, has been
successfully persisted or not. This can be enabled by creating the channel with
``PersistentChannel.props(persistentReply = true)``. With this setting, either the successfully persisted message
is replied to the sender or a ``PersistenceFailure``. In case of a persistence failure, the sender should re-send
the message.
Using a persistent channel in combination with an application-defined processor can make sense if destinations are
unavailable for a long time and an application doesn't want to buffer all messages in memory (but write them to the
journal instead). In this case, delivery can be disabled with ``DisableDelivery`` (to stop delivery and persist-only)
and re-enabled with ``EnableDelivery``. A disabled channel that receives ``EnableDelivery`` will restart itself and
re-deliver all persisted, unconfirmed messages before serving new ``Deliver`` requests.
Sender resolution Sender resolution
----------------- -----------------
@ -203,7 +252,8 @@ Identifiers
In the same way as :ref:`processors`, channels also have an identifier that defaults to a channel's path. A channel In the same way as :ref:`processors`, channels also have an identifier that defaults to a channel's path. A channel
identifier can therefore be customized by using a custom actor name at channel creation. As already mentioned, this identifier can therefore be customized by using a custom actor name at channel creation. As already mentioned, this
works well when using local actor references but may cause problems with remote actor references. In this case, an works well when using local actor references but may cause problems with remote actor references. In this case, an
application-defined channel id should be provided as argument to ``Channel.props(String)`` application-defined channel id should be provided as argument to ``Channel.props(String)`` or
``PersistentChannel.props(String)``.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-id-override .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#channel-id-override
@ -241,8 +291,8 @@ method or by pattern matching
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#sequence-nr-pattern-matching .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#sequence-nr-pattern-matching
Persistent messages are assigned sequence numbers on a per-processor basis. A sequence starts at ``1L`` and Persistent messages are assigned sequence numbers on a per-processor basis (or per persistent channel basis if used
doesn't contain gaps unless a processor marks a message as deleted. standalone). A sequence starts at ``1L`` and doesn't contain gaps unless a processor deletes a message.
.. _snapshots: .. _snapshots:
@ -342,8 +392,8 @@ Applications may also send a batch of ``Persistent`` messages to a processor via
received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes
can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example, can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example,
in :ref:`event-sourcing`, all events that are generated and persisted by a single command are batch-written to the in :ref:`event-sourcing`, all events that are generated and persisted by a single command are batch-written to the
journal. The recovery of an ``EventsourcedProcessor`` will therefore never be done partially i.e. with only a subset journal (even if ``persist`` is called multiple times per command). The recovery of an ``EventsourcedProcessor``
of events persisted by a single command. will therefore never be done partially i.e. with only a subset of events persisted by a single command.
Storage plugins Storage plugins
=============== ===============
@ -420,10 +470,3 @@ State machines
State machines can be persisted by mixing in the ``FSM`` trait into processors. State machines can be persisted by mixing in the ``FSM`` trait into processors.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#fsm-example .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#fsm-example
Upcoming features
=================
* Reliable channels
* Extended deletion of messages and snapshots
* ...

View file

@ -7,22 +7,21 @@ package akka.persistence.journal.japi;
import scala.concurrent.Future; import scala.concurrent.Future;
import akka.japi.Procedure; import akka.japi.Procedure;
import akka.persistence.PersistentImpl; import akka.persistence.PersistentRepr;
interface AsyncReplayPlugin { interface AsyncReplayPlugin {
//#async-replay-plugin-api //#async-replay-plugin-api
/** /**
* Plugin Java API. * Java API, Plugin API: asynchronously replays persistent messages.
* * Implementations replay a message by calling `replayCallback`. The returned
* Asynchronously replays persistent messages. Implementations replay a message * future must be completed when all messages (matching the sequence number
* by calling `replayCallback`. The returned future must be completed when all * bounds) have been replayed. The future `Long` value must be the highest
* messages (matching the sequence number bounds) have been replayed. The future * stored sequence number in the journal for the specified processor. The
* `Long` value must be the highest stored sequence number in the journal for the * future must be completed with a failure if any of the persistent messages
* specified processor. The future must be completed with a failure if any of * could not be replayed.
* the persistent messages could not be replayed.
* *
* The `replayCallback` must also be called with messages that have been marked * The `replayCallback` must also be called with messages that have been marked
* as deleted. In this case a replayed message's `deleted` field must be set to * as deleted. In this case a replayed message's `deleted` method must return
* `true`. * `true`.
* *
* The channel ids of delivery confirmations that are available for a replayed * The channel ids of delivery confirmations that are available for a replayed
@ -34,6 +33,6 @@ interface AsyncReplayPlugin {
* @param replayCallback called to replay a single message. Can be called from any * @param replayCallback called to replay a single message. Can be called from any
* thread. * thread.
*/ */
Future<Long> doReplayAsync(String processorId, long fromSequenceNr, long toSequenceNr, Procedure<PersistentImpl> replayCallback); Future<Long> doReplayAsync(String processorId, long fromSequenceNr, long toSequenceNr, Procedure<PersistentRepr> replayCallback);
//#async-replay-plugin-api //#async-replay-plugin-api
} }

View file

@ -6,36 +6,32 @@ package akka.persistence.journal.japi;
import scala.concurrent.Future; import scala.concurrent.Future;
import akka.persistence.PersistentImpl; import akka.persistence.PersistentRepr;
interface AsyncWritePlugin { interface AsyncWritePlugin {
//#async-write-plugin-api //#async-write-plugin-api
/** /**
* Plugin Java API. * Java API, Plugin API: asynchronously writes a `persistent` message to the journal.
*
* Asynchronously writes a `persistent` message to the journal.
*/ */
Future<Void> doWriteAsync(PersistentImpl persistent); Future<Void> doWriteAsync(PersistentRepr persistent);
/** /**
* Plugin Java API. * Java API, Plugin API: asynchronously writes a batch of persistent messages to the
* * journal. The batch write must be atomic i.e. either all persistent messages in the
* Asynchronously writes a batch of persistent messages to the journal. The batch write * batch are written or none.
* must be atomic i.e. either all persistent messages in the batch are written or none.
*/ */
Future<Void> doWriteBatchAsync(Iterable<PersistentImpl> persistentBatch); Future<Void> doWriteBatchAsync(Iterable<PersistentRepr> persistentBatch);
/** /**
* Plugin Java API. * Java API, Plugin API: asynchronously deletes a persistent message. If `physical`
* * is set to `false`, the persistent message is marked as deleted, otherwise it is
* Asynchronously marks a `persistent` message as deleted. * physically deleted.
*/ */
Future<Void> doDeleteAsync(PersistentImpl persistent); Future<Void> doDeleteAsync(String processorId, long sequenceNr, boolean physical);
/** /**
* Plugin Java API. * Java API, Plugin API: asynchronously writes a delivery confirmation to the
* * journal.
* Asynchronously writes a delivery confirmation to the journal.
*/ */
Future<Void> doConfirmAsync(String processorId, long sequenceNr, String channelId); Future<Void> doConfirmAsync(String processorId, long sequenceNr, String channelId);
//#async-write-plugin-api //#async-write-plugin-api

View file

@ -4,36 +4,31 @@
package akka.persistence.journal.japi; package akka.persistence.journal.japi;
import akka.persistence.PersistentImpl; import akka.persistence.PersistentRepr;
interface SyncWritePlugin { interface SyncWritePlugin {
//#sync-write-plugin-api //#sync-write-plugin-api
/** /**
* Plugin Java API. * Java API, Plugin API: synchronously writes a `persistent` message to the journal.
*
* Synchronously writes a `persistent` message to the journal.
*/ */
void doWrite(PersistentImpl persistent) throws Exception; void doWrite(PersistentRepr persistent) throws Exception;
/** /**
* Plugin Java API. * Java API, Plugin API: synchronously writes a batch of persistent messages to the
* * journal. The batch write must be atomic i.e. either all persistent messages in the
* Synchronously writes a batch of persistent messages to the journal. The batch write * batch are written or none.
* must be atomic i.e. either all persistent messages in the batch are written or none.
*/ */
void doWriteBatch(Iterable<PersistentImpl> persistentBatch); void doWriteBatch(Iterable<PersistentRepr> persistentBatch);
/** /**
* Plugin Java API. * Java API, Plugin API: synchronously deletes a persistent message. If `physical`
* * is set to `false`, the persistent message is marked as deleted, otherwise it is
* Synchronously marks a `persistent` message as deleted. * physically deleted.
*/ */
void doDelete(PersistentImpl persistent) throws Exception; void doDelete(String processorId, long sequenceNr, boolean physical);
/** /**
* Plugin Java API. * Java API, Plugin API: synchronously writes a delivery confirmation to the journal.
*
* Synchronously writes a delivery confirmation to the journal.
*/ */
void doConfirm(String processorId, long sequenceNr, String channelId) throws Exception; void doConfirm(String processorId, long sequenceNr, String channelId) throws Exception;
//#sync-write-plugin-api //#sync-write-plugin-api

View file

@ -12,9 +12,7 @@ import akka.persistence.*;
interface SnapshotStorePlugin { interface SnapshotStorePlugin {
//#snapshot-store-plugin-api //#snapshot-store-plugin-api
/** /**
* Plugin Java API. * Java API, Plugin API: asynchronously loads a snapshot.
*
* Asynchronously loads a snapshot.
* *
* @param processorId processor id. * @param processorId processor id.
* @param criteria selection criteria for loading. * @param criteria selection criteria for loading.
@ -22,9 +20,7 @@ interface SnapshotStorePlugin {
Future<Option<SelectedSnapshot>> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria); Future<Option<SelectedSnapshot>> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria);
/** /**
* Plugin Java API. * Java API, Plugin API: asynchronously saves a snapshot.
*
* Asynchronously saves a snapshot.
* *
* @param metadata snapshot metadata. * @param metadata snapshot metadata.
* @param snapshot snapshot. * @param snapshot snapshot.
@ -32,18 +28,14 @@ interface SnapshotStorePlugin {
Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot); Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot);
/** /**
* Plugin Java API. * Java API, Plugin API: called after successful saving of a snapshot.
*
* Called after successful saving of a snapshot.
* *
* @param metadata snapshot metadata. * @param metadata snapshot metadata.
*/ */
void onSaved(SnapshotMetadata metadata) throws Exception; void onSaved(SnapshotMetadata metadata) throws Exception;
/** /**
* Plugin Java API. * Java API, Plugin API: deletes the snapshot identified by `metadata`.
*
* Deletes the snapshot identified by `metadata`.
* *
* @param metadata snapshot metadata. * @param metadata snapshot metadata.
*/ */

View file

@ -13,10 +13,10 @@ message PersistentMessage {
optional PersistentPayload payload = 1; optional PersistentPayload payload = 1;
optional int64 sequenceNr = 2; optional int64 sequenceNr = 2;
optional string processorId = 3; optional string processorId = 3;
optional string channelId = 4;
optional bool deleted = 5; optional bool deleted = 5;
optional bool resolved = 6; optional bool resolved = 6;
repeated string confirms = 8; repeated string confirms = 8;
optional bool confirmable = 11;
optional ConfirmMessage confirmMessage = 10; optional ConfirmMessage confirmMessage = 10;
optional string confirmTarget = 9; optional string confirmTarget = 9;
optional string sender = 7; optional string sender = 7;
@ -33,3 +33,15 @@ message ConfirmMessage {
optional int64 sequenceNr = 2; optional int64 sequenceNr = 2;
optional string channelId = 3; optional string channelId = 3;
} }
message DeliverMessage {
enum ResolveStrategy {
Off = 1;
Sender = 2;
Destination = 3;
}
optional PersistentMessage persistent = 1;
optional string destination = 2;
optional ResolveStrategy resolve = 3;
}

View file

@ -18,9 +18,7 @@ akka {
serialization-bindings { serialization-bindings {
"akka.persistence.serialization.Snapshot" = akka-persistence-snapshot "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot
"akka.persistence.PersistentBatch" = akka-persistence-message "akka.persistence.serialization.Message" = akka-persistence-message
"akka.persistence.PersistentImpl" = akka-persistence-message
"akka.persistence.Confirm" = akka-persistence-message
} }
} }

View file

@ -4,13 +4,15 @@
package akka.persistence package akka.persistence
import akka.AkkaException
import akka.actor._ import akka.actor._
import akka.persistence.serialization.Message
/** /**
* A channel is used by [[Processor]]s for sending received persistent messages to destinations. * A channel is used by [[Processor]]s for sending [[Persistent]] messages to destinations. The main
* It prevents redundant delivery of messages to these destinations when a processor is recovered * responsibility of a channel is to prevent redundant delivery of replayed messages to destinations
* i.e. receives replayed messages. This requires that channel destinations confirm the receipt of * when a processor is recovered.
* persistent messages by calling `confirm()` on the [[Persistent]] message.
* *
* A channel can be instructed to deliver a persistent message to a `destination` via the [[Deliver]] * A channel can be instructed to deliver a persistent message to a `destination` via the [[Deliver]]
* command. * command.
@ -45,56 +47,69 @@ import akka.actor._
* } * }
* }}} * }}}
* *
* Redundant delivery of messages to destinations is only prevented if the receipt of these messages
* is explicitly confirmed. Therefore, persistent messages that are delivered via a channel are of type
* [[ConfirmablePersistent]]. Their receipt can be confirmed by a destination by calling the `confirm()`
* method on these messages.
*
* {{{
* class MyDestination extends Actor {
* def receive = {
* case cp @ ConfirmablePersistent(payload, sequenceNr) => cp.confirm()
* }
* }
* }}}
*
* A channel will only re-deliver messages if the sending processor is recovered and delivery of these
* messages has not been confirmed yet. Hence, a channel can be used to avoid message loss in case of
* sender JVM crashes, for example. A channel, however, does not attempt any re-deliveries should a
* destination be unavailable. Re-delivery to destinations (in case of network failures or destination
* JVM crashes) is an application-level concern and can be done by using a reliable proxy, for example.
*
* @see [[Deliver]] * @see [[Deliver]]
*/ */
class Channel private (_channelId: Option[String]) extends Actor with Stash { sealed class Channel private[akka] (_channelId: Option[String]) extends Actor with Stash {
private val extension = Persistence(context.system) private val extension = Persistence(context.system)
private val id = _channelId match { private val id = _channelId match {
case Some(cid) cid case Some(cid) cid
case None extension.channelId(self) case None extension.channelId(self)
} }
/**
* Creates a new channel with a generated channel id.
*/
def this() = this(None)
/**
* Creates a new channel with specified channel id.
*
* @param channelId channel id.
*/
def this(channelId: String) = this(Some(channelId))
import ResolvedDelivery._ import ResolvedDelivery._
private val delivering: Actor.Receive = { private val delivering: Actor.Receive = {
case Deliver(persistent: PersistentImpl, destination, resolve) { case Deliver(persistent: PersistentRepr, destination, resolve) {
if (!persistent.confirms.contains(id)) { if (!persistent.confirms.contains(id)) {
val msg = persistent.copy(channelId = id, val prepared = prepareDelivery(persistent)
confirmTarget = extension.journalFor(persistent.processorId),
confirmMessage = Confirm(persistent.processorId, persistent.sequenceNr, id))
resolve match { resolve match {
case Resolve.Sender if !persistent.resolved { case Resolve.Sender if !prepared.resolved {
context.actorOf(Props(classOf[ResolvedSenderDelivery], msg, destination, sender)) ! DeliverResolved context.actorOf(Props(classOf[ResolvedSenderDelivery], prepared, destination, sender)) ! DeliverResolved
context.become(buffering, false) context.become(buffering, false)
} }
case Resolve.Destination if !persistent.resolved { case Resolve.Destination if !prepared.resolved {
context.actorOf(Props(classOf[ResolvedDestinationDelivery], msg, destination, sender)) ! DeliverResolved context.actorOf(Props(classOf[ResolvedDestinationDelivery], prepared, destination, sender)) ! DeliverResolved
context.become(buffering, false) context.become(buffering, false)
} }
case _ destination tell (msg, sender) case _ destination tell (prepared, sender)
} }
} }
unstash()
} }
} }
private val buffering: Actor.Receive = { private val buffering: Actor.Receive = {
case DeliveredResolved | DeliveredUnresolved { context.unbecome(); unstashAll() } // TODO: optimize case DeliveredResolved | DeliveredUnresolved { context.unbecome(); unstash() }
case _: Deliver stash() case _: Deliver stash()
} }
def receive = delivering def receive = delivering
private[akka] def prepareDelivery(persistent: PersistentRepr): PersistentRepr = {
ConfirmablePersistentImpl(
persistent = persistent,
confirmTarget = extension.journalFor(persistent.processorId),
confirmMessage = Confirm(persistent.processorId, persistent.sequenceNr, id))
}
} }
object Channel { object Channel {
@ -102,7 +117,7 @@ object Channel {
* Returns a channel configuration object for creating a [[Channel]] with a * Returns a channel configuration object for creating a [[Channel]] with a
* generated id. * generated id.
*/ */
def props(): Props = Props(classOf[Channel]) def props(): Props = Props(classOf[Channel], None)
/** /**
* Returns a channel configuration object for creating a [[Channel]] with the * Returns a channel configuration object for creating a [[Channel]] with the
@ -110,12 +125,159 @@ object Channel {
* *
* @param channelId channel id. * @param channelId channel id.
*/ */
def props(channelId: String): Props = Props(classOf[Channel], channelId) def props(channelId: String): Props = Props(classOf[Channel], Some(channelId))
} }
/** /**
* Instructs a [[Channel]] to deliver `persistent` message to destination `destination`. * A [[PersistentChannel]] implements the same functionality as a [[Channel]] but additionally
* The `resolve` parameter can be: * persists messages before they are delivered. Therefore, the main use case of a persistent
* channel is standalone usage i.e. independent of a sending [[Processor]]. Messages that have
* been persisted by a persistent channel are deleted again when destinations confirm the receipt
* of these messages.
*
* Using a persistent channel in combination with a [[Processor]] can make sense if destinations
* are unavailable for a long time and an application doesn't want to buffer all messages in
* memory (but write them to a journal instead). In this case, delivery can be disabled with
* [[DisableDelivery]] (to stop delivery and persist-only) and re-enabled with [[EnableDelivery]].
*
* A persistent channel can also be configured to reply whether persisting a message was successful
* or not (see `PersistentChannel.props` methods). If enabled, the sender will receive the persisted
* message as reply (i.e. a [[Persistent]] message), otherwise a [[PersistenceFailure]] message.
*
* A persistent channel will only re-deliver un-confirmed, stored messages if it is started or re-
* enabled with [[EnableDelivery]]. Hence, a persistent channel can be used to avoid message loss
* in case of sender JVM crashes, for example. A channel, however, does not attempt any re-deliveries
* should a destination be unavailable. Re-delivery to destinations (in case of network failures or
* destination JVM crashes) is an application-level concern and can be done by using a reliable proxy,
* for example.
*/
final class PersistentChannel private[akka] (_channelId: Option[String], persistentReply: Boolean) extends EventsourcedProcessor {
override val processorId = _channelId.getOrElse(super.processorId)
private val journal = Persistence(context.system).journalFor(processorId)
private val channel = context.actorOf(Props(classOf[NoPrepChannel], processorId))
private var deliveryEnabled = true
def receiveReplay: Receive = {
case Deliver(persistent: PersistentRepr, destination, resolve) deliver(prepareDelivery(persistent), destination, resolve)
}
def receiveCommand: Receive = {
case d @ Deliver(persistent: PersistentRepr, destination, resolve) {
if (!persistent.confirms.contains(processorId)) {
persist(d) { _
val prepared = prepareDelivery(persistent)
if (persistent.processorId != PersistentRepr.Undefined)
journal ! Confirm(persistent.processorId, persistent.sequenceNr, processorId)
if (persistentReply)
sender ! prepared
if (deliveryEnabled)
deliver(prepared, destination, resolve)
}
}
}
case c: Confirm deleteMessage(c.sequenceNr, true)
case DisableDelivery deliveryEnabled = false
case EnableDelivery if (!deliveryEnabled) throw new ChannelRestartRequiredException
case p: PersistenceFailure if (persistentReply) sender ! p
}
private def prepareDelivery(persistent: PersistentRepr): PersistentRepr = currentPersistentMessage.map { current
val sequenceNr = if (persistent.sequenceNr == 0L) current.sequenceNr else persistent.sequenceNr
val resolved = persistent.resolved && current.asInstanceOf[PersistentRepr].resolved
persistent.update(sequenceNr = sequenceNr, resolved = resolved)
} getOrElse (persistent)
private def deliver(persistent: PersistentRepr, destination: ActorRef, resolve: Resolve.ResolveStrategy) = currentPersistentMessage.foreach { current
channel forward Deliver(persistent = ConfirmablePersistentImpl(persistent,
confirmTarget = self,
confirmMessage = Confirm(processorId, current.sequenceNr, PersistentRepr.Undefined)), destination, resolve)
}
}
object PersistentChannel {
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with a
* generated id. The sender will not receive persistence completion replies.
*/
def props(): Props = props(persistentReply = false)
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with a
* generated id.
*
* @param persistentReply if `true` the sender will receive the successfully stored
* [[Persistent]] message that has been submitted with a
* [[Deliver]] request, or a [[PersistenceFailure]] message
* in case of a persistence failure.
*/
def props(persistentReply: Boolean): Props = Props(classOf[PersistentChannel], None, persistentReply)
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with the
* specified id. The sender will not receive persistence completion replies.
*
* @param channelId channel id.
*/
def props(channelId: String): Props = props(channelId, persistentReply = false)
/**
* Returns a channel configuration object for creating a [[PersistentChannel]] with the
* specified id.
*
* @param channelId channel id.
* @param persistentReply if `true` the sender will receive the successfully stored
* [[Persistent]] message that has been submitted with a
* [[Deliver]] request, or a [[PersistenceFailure]] message
* in case of a persistence failure.
*/
def props(channelId: String, persistentReply: Boolean): Props = Props(classOf[PersistentChannel], Some(channelId), persistentReply)
}
/**
* Instructs a [[PersistentChannel]] to disable the delivery of [[Persistent]] messages to their destination.
* The persistent channel, however, continues to persist messages (for later delivery).
*
* @see [[EnableDelivery]]
*/
@SerialVersionUID(1L)
case object DisableDelivery {
/**
* Java API.
*/
def getInstance = this
}
/**
* Instructs a [[PersistentChannel]] to re-enable the delivery of [[Persistent]] messages to their destination.
* This will first deliver all messages that have been stored by a persistent channel for which no confirmation
* is available yet. New [[Deliver]] requests are processed after all stored messages have been delivered. This
* request only has an effect if a persistent channel has previously been disabled with [[DisableDelivery]].
*
* @see [[DisableDelivery]]
*/
@SerialVersionUID(1L)
case object EnableDelivery {
/**
* Java API.
*/
def getInstance = this
}
/**
* Thrown by a persistent channel when [[EnableDelivery]] has been requested and delivery has been previously
* disabled for that channel.
*/
@SerialVersionUID(1L)
class ChannelRestartRequiredException extends AkkaException("channel restart required for enabling delivery")
/**
* Instructs a [[Channel]] or [[PersistentChannel]] to deliver `persistent` message to
* destination `destination`. The `resolve` parameter can be:
* *
* - `Resolve.Destination`: will resolve a new destination reference from the specified * - `Resolve.Destination`: will resolve a new destination reference from the specified
* `destination`s path. The `persistent` message will be sent to the newly resolved * `destination`s path. The `persistent` message will be sent to the newly resolved
@ -160,7 +322,7 @@ object Channel {
* @param resolve resolve strategy. * @param resolve resolve strategy.
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
case class Deliver(persistent: Persistent, destination: ActorRef, resolve: Resolve.ResolveStrategy = Resolve.Off) case class Deliver(persistent: Persistent, destination: ActorRef, resolve: Resolve.ResolveStrategy = Resolve.Off) extends Message
object Deliver { object Deliver {
/** /**
@ -253,9 +415,9 @@ private object ResolvedDelivery {
* Resolves `destination` before sending `persistent` message to the resolved destination using * Resolves `destination` before sending `persistent` message to the resolved destination using
* the specified sender (`sdr`) as message sender. * the specified sender (`sdr`) as message sender.
*/ */
private class ResolvedDestinationDelivery(persistent: PersistentImpl, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery { private class ResolvedDestinationDelivery(persistent: PersistentRepr, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery {
val path = destination.path val path = destination.path
def onResolveSuccess(ref: ActorRef) = ref tell (persistent.copy(resolved = true), sdr) def onResolveSuccess(ref: ActorRef) = ref tell (persistent.update(resolved = true), sdr)
def onResolveFailure() = destination tell (persistent, sdr) def onResolveFailure() = destination tell (persistent, sdr)
} }
@ -263,9 +425,15 @@ private class ResolvedDestinationDelivery(persistent: PersistentImpl, destinatio
* Resolves `sdr` before sending `persistent` message to specified `destination` using * Resolves `sdr` before sending `persistent` message to specified `destination` using
* the resolved sender as message sender. * the resolved sender as message sender.
*/ */
private class ResolvedSenderDelivery(persistent: PersistentImpl, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery { private class ResolvedSenderDelivery(persistent: PersistentRepr, destination: ActorRef, sdr: ActorRef) extends ResolvedDelivery {
val path = sdr.path val path = sdr.path
def onResolveSuccess(ref: ActorRef) = destination tell (persistent.copy(resolved = true), ref) def onResolveSuccess(ref: ActorRef) = destination tell (persistent.update(resolved = true), ref)
def onResolveFailure() = destination tell (persistent, sdr) def onResolveFailure() = destination tell (persistent, sdr)
} }
/**
* [[Channel]] specialization used by [[PersistentChannel]] to deliver stored messages.
*/
private class NoPrepChannel(channelId: String) extends Channel(Some(channelId)) {
override private[akka] def prepareDelivery(persistent: PersistentRepr) = persistent
}

View file

@ -64,11 +64,11 @@ private[persistence] trait Eventsourced extends Processor {
private val persistingEvents: State = new State { private val persistingEvents: State = new State {
def aroundReceive(receive: Receive, message: Any) = message match { def aroundReceive(receive: Receive, message: Any) = message match {
case PersistentBatch(b) { case PersistentBatch(b) {
b.foreach(deleteMessage) b.foreach(p deleteMessage(p, true))
throw new UnsupportedOperationException("Persistent command batches not supported") throw new UnsupportedOperationException("Persistent command batches not supported")
} }
case p: PersistentImpl { case p: PersistentRepr {
deleteMessage(p) deleteMessage(p, true)
throw new UnsupportedOperationException("Persistent commands not supported") throw new UnsupportedOperationException("Persistent commands not supported")
} }
case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) { case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) {
@ -95,10 +95,10 @@ private[persistence] trait Eventsourced extends Processor {
} }
private var persistInvocations: List[(Any, Any Unit)] = Nil private var persistInvocations: List[(Any, Any Unit)] = Nil
private var persistentEventBatch: List[PersistentImpl] = Nil private var persistentEventBatch: List[PersistentRepr] = Nil
private var currentState: State = recovering private var currentState: State = recovering
private val processorStash = createProcessorStash private val processorStash = createStash()
/** /**
* Asynchronously persists `event`. On successful persistence, `handler` is called with the * Asynchronously persists `event`. On successful persistence, `handler` is called with the
@ -124,12 +124,13 @@ private[persistence] trait Eventsourced extends Processor {
*/ */
final def persist[A](event: A)(handler: A Unit): Unit = { final def persist[A](event: A)(handler: A Unit): Unit = {
persistInvocations = (event, handler.asInstanceOf[Any Unit]) :: persistInvocations persistInvocations = (event, handler.asInstanceOf[Any Unit]) :: persistInvocations
persistentEventBatch = PersistentImpl(event) :: persistentEventBatch persistentEventBatch = PersistentRepr(event) :: persistentEventBatch
} }
/** /**
* Asynchronously persists `events` in specified order. This is equivalent to calling * Asynchronously persists `events` in specified order. This is equivalent to calling
* `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`. * `persist[A](event: A)(handler: A => Unit)` multiple times with the same `handler`,
* except that `events` are persisted atomically with this method.
* *
* @param events events to be persisted. * @param events events to be persisted.
* @param handler handler for each persisted `events` * @param handler handler for each persisted `events`
@ -211,9 +212,7 @@ trait EventsourcedProcessor extends Processor with Eventsourced {
} }
/** /**
* Java API. * Java API: an event sourced processor.
*
* An event sourced processor.
*/ */
abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced { abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced {
final def onReceive(message: Any) = initialBehavior(message) final def onReceive(message: Any) = initialBehavior(message)
@ -227,9 +226,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
} }
/** /**
* Java API. * Java API: asynchronously persists `event`. On successful persistence, `handler` is called with the
*
* Asynchronously persists `event`. On successful persistence, `handler` is called with the
* persisted event. It is guaranteed that no new commands will be received by a processor * persisted event. It is guaranteed that no new commands will be received by a processor
* between a call to `persist` and the execution of its `handler`. This also holds for * between a call to `persist` and the execution of its `handler`. This also holds for
* multiple `persist` calls per received command. Internally, this is achieved by stashing new * multiple `persist` calls per received command. Internally, this is achieved by stashing new
@ -254,10 +251,9 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
persist(event)(event handler(event)) persist(event)(event handler(event))
/** /**
* Java API. * Java API: asynchronously persists `events` in specified order. This is equivalent to calling
* * `persist[A](event: A, handler: Procedure[A])` multiple times with the same `handler`,
* Asynchronously persists `events` in specified order. This is equivalent to calling * except that `events` are persisted atomically with this method.
* `persist[A](event: A, handler: Procedure[A])` multiple times with the same `handler`.
* *
* @param events events to be persisted. * @param events events to be persisted.
* @param handler handler for each persisted `events` * @param handler handler for each persisted `events`
@ -266,9 +262,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
persist(Util.immutableSeq(events))(event handler(event)) persist(Util.immutableSeq(events))(event handler(event))
/** /**
* Java API. * Java API: replay handler that receives persisted events during recovery. If a state snapshot
*
* Replay handler that receives persisted events during recovery. If a state snapshot
* has been captured and saved, this handler will receive a [[SnapshotOffer]] message * has been captured and saved, this handler will receive a [[SnapshotOffer]] message
* followed by events that are younger than the offered snapshot. * followed by events that are younger than the offered snapshot.
* *
@ -281,9 +275,7 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
def onReceiveReplay(msg: Any): Unit def onReceiveReplay(msg: Any): Unit
/** /**
* Java API. * Java API: command handler. Typically validates commands against current state (and/or by
*
* Command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are * communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`. * derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors must not be [[Persistent]] or * Commands sent to event sourced processors must not be [[Persistent]] or

View file

@ -13,12 +13,12 @@ import akka.actor._
*/ */
private[persistence] object JournalProtocol { private[persistence] object JournalProtocol {
/** /**
* Instructs a journal to mark the `persistent` message as deleted. * Instructs a journal to delete a persistent message identified by `processorId`
* A persistent message marked as deleted is not replayed during recovery. * and `sequenceNr`. If `physical` is set to `false`, the persistent message is
* * marked as deleted in the journal, otherwise it is physically deleted from the
* @param persistent persistent message. * journal.
*/ */
case class Delete(persistent: Persistent) case class Delete(processorId: String, sequenceNr: Long, physical: Boolean)
/** /**
* Instructs a journal to persist a sequence of messages. * Instructs a journal to persist a sequence of messages.
@ -26,7 +26,7 @@ private[persistence] object JournalProtocol {
* @param persistentBatch batch of messages to be persisted. * @param persistentBatch batch of messages to be persisted.
* @param processor requesting processor. * @param processor requesting processor.
*/ */
case class WriteBatch(persistentBatch: immutable.Seq[PersistentImpl], processor: ActorRef) case class WriteBatch(persistentBatch: immutable.Seq[PersistentRepr], processor: ActorRef)
/** /**
* Instructs a journal to persist a message. * Instructs a journal to persist a message.
@ -34,14 +34,14 @@ private[persistence] object JournalProtocol {
* @param persistent message to be persisted. * @param persistent message to be persisted.
* @param processor requesting processor. * @param processor requesting processor.
*/ */
case class Write(persistent: PersistentImpl, processor: ActorRef) case class Write(persistent: PersistentRepr, processor: ActorRef)
/** /**
* Reply message to a processor that `persistent` message has been successfully journaled. * Reply message to a processor that `persistent` message has been successfully journaled.
* *
* @param persistent persistent message. * @param persistent persistent message.
*/ */
case class WriteSuccess(persistent: PersistentImpl) case class WriteSuccess(persistent: PersistentRepr)
/** /**
* Reply message to a processor that `persistent` message could not be journaled. * Reply message to a processor that `persistent` message could not be journaled.
@ -49,7 +49,7 @@ private[persistence] object JournalProtocol {
* @param persistent persistent message. * @param persistent persistent message.
* @param cause failure cause. * @param cause failure cause.
*/ */
case class WriteFailure(persistent: PersistentImpl, cause: Throwable) case class WriteFailure(persistent: PersistentRepr, cause: Throwable)
/** /**
* Instructs a journal to loop a `message` back to `processor`, without persisting the * Instructs a journal to loop a `message` back to `processor`, without persisting the
@ -83,7 +83,7 @@ private[persistence] object JournalProtocol {
* *
* @param persistent persistent message. * @param persistent persistent message.
*/ */
case class Replayed(persistent: PersistentImpl) case class Replayed(persistent: PersistentRepr)
/** /**
* Reply message to a processor that all `persistent` messages have been replayed. * Reply message to a processor that all `persistent` messages have been replayed.

View file

@ -12,6 +12,8 @@ import scala.collection.immutable
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.persistence.serialization.Message
/** /**
* Persistent message. * Persistent message.
*/ */
@ -34,18 +36,11 @@ sealed abstract class Persistent {
* Creates a new persistent message with the specified `payload`. * Creates a new persistent message with the specified `payload`.
*/ */
def withPayload(payload: Any): Persistent def withPayload(payload: Any): Persistent
/**
* Called by [[Channel]] destinations to confirm the receipt of a persistent message.
*/
def confirm(): Unit
} }
object Persistent { object Persistent {
/** /**
* Java API. * Java API: creates a new persistent message. Must only be used outside processors.
*
* Creates a new persistent message. Must only be used outside processors.
* *
* @param payload payload of new persistent message. * @param payload payload of new persistent message.
*/ */
@ -53,9 +48,7 @@ object Persistent {
create(payload, null) create(payload, null)
/** /**
* Java API. * Java API: creates a new persistent message, derived from the specified current message. The current
*
* Creates a new persistent message, derived from the specified current message. The current
* message can be obtained inside a [[Processor]] by calling `getCurrentPersistentMessage()`. * message can be obtained inside a [[Processor]] by calling `getCurrentPersistentMessage()`.
* *
* @param payload payload of new persistent message. * @param payload payload of new persistent message.
@ -73,68 +66,212 @@ object Persistent {
* @param currentPersistentMessage optional current persistent message, defaults to `None`. * @param currentPersistentMessage optional current persistent message, defaults to `None`.
*/ */
def apply(payload: Any)(implicit currentPersistentMessage: Option[Persistent] = None): Persistent = def apply(payload: Any)(implicit currentPersistentMessage: Option[Persistent] = None): Persistent =
currentPersistentMessage.map(_.withPayload(payload)).getOrElse(PersistentImpl(payload)) currentPersistentMessage.map(_.withPayload(payload)).getOrElse(PersistentRepr(payload))
/** /**
* Persistent message extractor. * [[Persistent]] extractor.
*/ */
def unapply(persistent: Persistent): Option[(Any, Long)] = def unapply(persistent: Persistent): Option[(Any, Long)] =
Some((persistent.payload, persistent.sequenceNr)) Some((persistent.payload, persistent.sequenceNr))
} }
/**
* Persistent message that has been delivered by a [[Channel]] or [[PersistentChannel]]. Channel
* destinations that receive messages of this type can confirm their receipt by calling [[confirm]].
*/
sealed abstract class ConfirmablePersistent extends Persistent {
/**
* Called by [[Channel]] and [[PersistentChannel]] destinations to confirm the receipt of a
* persistent message.
*/
def confirm(): Unit
}
object ConfirmablePersistent {
/**
* [[ConfirmablePersistent]] extractor.
*/
def unapply(persistent: ConfirmablePersistent): Option[(Any, Long)] =
Some((persistent.payload, persistent.sequenceNr))
}
/** /**
* Instructs a [[Processor]] to atomically write the contained [[Persistent]] messages to the * Instructs a [[Processor]] to atomically write the contained [[Persistent]] messages to the
* journal. The processor receives the written messages individually as [[Persistent]] messages. * journal. The processor receives the written messages individually as [[Persistent]] messages.
* During recovery, they are also replayed individually. * During recovery, they are also replayed individually.
*/ */
case class PersistentBatch(persistentBatch: immutable.Seq[Persistent]) { case class PersistentBatch(persistentBatch: immutable.Seq[Persistent]) extends Message {
/** /**
* INTERNAL API. * INTERNAL API.
*/ */
private[persistence] def persistentImplList: List[PersistentImpl] = private[persistence] def persistentReprList: List[PersistentRepr] =
persistentBatch.toList.asInstanceOf[List[PersistentImpl]] persistentBatch.toList.asInstanceOf[List[PersistentRepr]]
}
/**
* Plugin API: representation of a persistent message in the journal plugin API.
*
* @see[[SyncWriteJournal]]
* @see[[AsyncWriteJournal]]
* @see[[AsyncReplay]]
*/
trait PersistentRepr extends Persistent with Message {
import scala.collection.JavaConverters._
/**
* This persistent message's payload.
*/
def payload: Any
/**
* This persistent message's seuence number.
*/
def sequenceNr: Long
/**
* Id of processor that journals the message
*/
def processorId: String
/**
* `true` if this message is marked as deleted.
*/
def deleted: Boolean
/**
* `true` by default, `false` for replayed messages. Set to `true` by a channel if this
* message is replayed and its sender reference was resolved. Channels use this field to
* avoid redundant sender reference resolutions.
*/
def resolved: Boolean
/**
* Channel ids of delivery confirmations that are available for this message. Only non-empty
* for replayed messages.
*/
def confirms: immutable.Seq[String]
/**
* Java API, Plugin API: channel ids of delivery confirmations that are available for this
* message. Only non-empty for replayed messages.
*/
def getConfirms: JList[String] = confirms.asJava
/**
* `true` only if this message has been delivered by a channel.
*/
def confirmable: Boolean
/**
* Delivery confirmation message.
*/
def confirmMessage: Confirm
/**
* Delivery confirmation message.
*/
def confirmTarget: ActorRef
/**
* Sender of this message.
*/
def sender: ActorRef
private[persistence] def prepareWrite(sender: ActorRef): PersistentRepr
private[persistence] def update(
sequenceNr: Long = sequenceNr,
processorId: String = processorId,
deleted: Boolean = deleted,
resolved: Boolean = resolved,
confirms: immutable.Seq[String] = confirms,
confirmMessage: Confirm = confirmMessage,
confirmTarget: ActorRef = confirmTarget): PersistentRepr
}
object PersistentRepr {
/**
* Plugin API: value of an undefined processor or channel id.
*/
val Undefined = ""
/**
* Plugin API.
*/
def apply(
payload: Any,
sequenceNr: Long = 0L,
processorId: String = PersistentRepr.Undefined,
deleted: Boolean = false,
resolved: Boolean = true,
confirms: immutable.Seq[String] = Nil,
confirmable: Boolean = false,
confirmMessage: Confirm = null,
confirmTarget: ActorRef = null,
sender: ActorRef = null) =
if (confirmable) ConfirmablePersistentImpl(payload, sequenceNr, processorId, deleted, resolved, confirms, confirmMessage, confirmTarget, sender)
else PersistentImpl(payload, sequenceNr, processorId, deleted, confirms, sender)
/**
* Java API, Plugin API.
*/
def create = apply _
} }
object PersistentBatch { object PersistentBatch {
/** /**
* JAVA API. * Java API.
*/ */
def create(persistentBatch: JIterable[Persistent]) = def create(persistentBatch: JIterable[Persistent]) =
PersistentBatch(immutableSeq(persistentBatch)) PersistentBatch(immutableSeq(persistentBatch))
} }
/** /**
* Plugin API. * INTERNAL API.
*
* Internal [[Persistent]] message representation.
*
* @param processorId Id of processor that journaled the message.
* @param channelId Id of last channel that delivered the message to a destination.
* @param sender Serialized sender reference.
* @param deleted `true` if this message is marked as deleted.
* @param resolved `true` by default, `false` for replayed messages. Set to `true` by a channel if this
* message is replayed and its sender reference was resolved. Channels use this field to
* avoid redundant sender reference resolutions.
* @param confirms Channel ids of delivery confirmations that are available for this message. Only non-empty
* for replayed messages.
* @param confirmTarget Delivery confirmation target.
* @param confirmMessage Delivery confirmation message.
*
* @see [[Processor]]
* @see [[Channel]]
* @see [[Deliver]]
*/ */
case class PersistentImpl( private[persistence] case class PersistentImpl(
payload: Any, payload: Any,
sequenceNr: Long = 0L, sequenceNr: Long,
processorId: String = PersistentImpl.Undefined, processorId: String,
channelId: String = PersistentImpl.Undefined, deleted: Boolean,
deleted: Boolean = false, confirms: immutable.Seq[String],
resolved: Boolean = true, sender: ActorRef) extends Persistent with PersistentRepr {
confirms: Seq[String] = Nil,
confirmMessage: Confirm = null, def withPayload(payload: Any): Persistent =
confirmTarget: ActorRef = null, copy(payload = payload)
sender: ActorRef = null) extends Persistent {
def prepareWrite(sender: ActorRef) =
copy(sender = sender)
def update(
sequenceNr: Long,
processorId: String,
deleted: Boolean,
resolved: Boolean,
confirms: immutable.Seq[String],
confirmMessage: Confirm,
confirmTarget: ActorRef) =
copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, confirms = confirms)
val resolved: Boolean = false
val confirmable: Boolean = false
val confirmMessage: Confirm = null
val confirmTarget: ActorRef = null
}
/**
* INTERNAL API.
*/
private[persistence] case class ConfirmablePersistentImpl(
payload: Any,
sequenceNr: Long,
processorId: String,
deleted: Boolean,
resolved: Boolean,
confirms: immutable.Seq[String],
confirmMessage: Confirm,
confirmTarget: ActorRef,
sender: ActorRef) extends ConfirmablePersistent with PersistentRepr {
def withPayload(payload: Any): Persistent = def withPayload(payload: Any): Persistent =
copy(payload = payload) copy(payload = payload)
@ -142,41 +279,23 @@ case class PersistentImpl(
def confirm(): Unit = def confirm(): Unit =
if (confirmTarget != null) confirmTarget ! confirmMessage if (confirmTarget != null) confirmTarget ! confirmMessage
import scala.collection.JavaConverters._ def confirmable = true
/** def prepareWrite(sender: ActorRef) =
* Java Plugin API. copy(sender = sender, resolved = false, confirmMessage = null, confirmTarget = null)
*/
def getConfirms: JList[String] = confirms.asJava
private[persistence] def prepareWrite(sender: ActorRef) = def update(sequenceNr: Long, processorId: String, deleted: Boolean, resolved: Boolean, confirms: immutable.Seq[String], confirmMessage: Confirm, confirmTarget: ActorRef) =
copy(sender = sender, resolved = false, confirmTarget = null, confirmMessage = null) copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, resolved = resolved, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget)
} }
object PersistentImpl { private[persistence] object ConfirmablePersistentImpl {
val Undefined = "" def apply(persistent: PersistentRepr, confirmMessage: Confirm, confirmTarget: ActorRef): ConfirmablePersistentImpl =
ConfirmablePersistentImpl(persistent.payload, persistent.sequenceNr, persistent.processorId, persistent.deleted, persistent.resolved, persistent.confirms, confirmMessage, confirmTarget, persistent.sender)
/**
* Java Plugin API.
*/
def create(payload: Any, sequenceNr: Long, processorId: String, channelId: String, deleted: Boolean, resolved: Boolean, confirms: Seq[String], confirmMessage: Confirm, confirmTarget: ActorRef, sender: ActorRef): PersistentImpl =
PersistentImpl(payload, sequenceNr, processorId, channelId, deleted, resolved, confirms, confirmMessage, confirmTarget, sender)
} }
/** /**
* Sent to a [[Processor]] when a journal failed to write a [[Persistent]] message. If * INTERNAL API.
* not handled, an `akka.actor.ActorKilledException` is thrown by that processor.
* *
* @param payload payload of the persistent message. * Message to confirm the receipt of a [[ConfirmablePersistent]] message.
* @param sequenceNr sequence number of the persistent message.
* @param cause failure cause.
*/ */
case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable) private[persistence] case class Confirm(processorId: String, sequenceNr: Long, channelId: String) extends Message
/**
* Internal API.
*
* Message to confirm the receipt of a persistent message (sent via a [[Channel]]).
*/
@SerialVersionUID(1L)
private[persistence] case class Confirm(processorId: String, sequenceNr: Long, channelId: String)

View file

@ -4,8 +4,6 @@
package akka.persistence package akka.persistence
import scala.collection.immutable
import akka.actor._ import akka.actor._
import akka.dispatch._ import akka.dispatch._
@ -53,7 +51,7 @@ import akka.dispatch._
* @see [[Recover]] * @see [[Recover]]
* @see [[PersistentBatch]] * @see [[PersistentBatch]]
*/ */
trait Processor extends Actor with Stash { trait Processor extends Actor with Stash with StashFactory {
import JournalProtocol._ import JournalProtocol._
import SnapshotProtocol._ import SnapshotProtocol._
@ -155,8 +153,8 @@ trait Processor extends Actor with Stash {
} }
} }
case LoopSuccess(m) process(receive, m) case LoopSuccess(m) process(receive, m)
case p: PersistentImpl journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self) case p: PersistentRepr journal forward Write(p.update(processorId = processorId, sequenceNr = nextSequenceNr()), self)
case pb: PersistentBatch journal forward WriteBatch(pb.persistentImplList.map(_.copy(processorId = processorId, sequenceNr = nextSequenceNr())), self) case pb: PersistentBatch journal forward WriteBatch(pb.persistentReprList.map(_.update(processorId = processorId, sequenceNr = nextSequenceNr())), self)
case m journal forward Loop(m, self) case m journal forward Loop(m, self)
} }
} }
@ -241,9 +239,50 @@ trait Processor extends Actor with Stash {
* recovery. This method is usually called inside `preRestartProcessor` when a persistent message * recovery. This method is usually called inside `preRestartProcessor` when a persistent message
* caused an exception. Processors that want to re-receive that persistent message during recovery * caused an exception. Processors that want to re-receive that persistent message during recovery
* should not call this method. * should not call this method.
*
* @param persistent persistent message to be marked as deleted.
* @throws IllegalArgumentException if `persistent` message has not been persisted by this
* processor.
*/ */
def deleteMessage(persistent: Persistent): Unit = { def deleteMessage(persistent: Persistent): Unit = {
journal ! Delete(persistent) deleteMessage(persistent, false)
}
/**
* Deletes a `persistent` message. If `physical` is set to `false` (default), the persistent
* message is marked as deleted in the journal, otherwise it is physically deleted from the
* journal. A deleted message is not replayed during recovery. This method is usually called
* inside `preRestartProcessor` when a persistent message caused an exception. Processors that
* want to re-receive that persistent message during recovery should not call this method.
*
* @param persistent persistent message to be deleted.
* @param physical if `false` (default), the message is marked as deleted, otherwise it is
* physically deleted.
* @throws IllegalArgumentException if `persistent` message has not been persisted by this
* processor.
*/
def deleteMessage(persistent: Persistent, physical: Boolean): Unit = {
val impl = persistent.asInstanceOf[PersistentRepr]
if (impl.processorId != processorId)
throw new IllegalArgumentException(
s"persistent message to be deleted (processor id = [${impl.processorId}], sequence number = [${impl.sequenceNr}]) " +
s"has not been persisted by this processor (processor id = [${processorId}])")
else deleteMessage(impl.sequenceNr, physical)
}
/**
* Deletes a persistent message identified by `sequenceNr`. If `physical` is set to `false`,
* the persistent message is marked as deleted in the journal, otherwise it is physically
* deleted from the journal. A deleted message is not replayed during recovery. This method
* is usually called inside `preRestartProcessor` when a persistent message caused an exception.
* Processors that want to re-receive that persistent message during recovery should not call
* this method.
*
* @param sequenceNr sequence number of the persistent message to be deleted.
* @param physical if `false`, the message is marked as deleted, otherwise it is physically deleted.
*/
def deleteMessage(sequenceNr: Long, physical: Boolean): Unit = {
journal ! Delete(processorId, sequenceNr, physical)
} }
/** /**
@ -351,71 +390,25 @@ trait Processor extends Actor with Stash {
case _ true case _ true
} }
private val processorStash = private val processorStash = createStash()
createProcessorStash
private def currentEnvelope: Envelope = private def currentEnvelope: Envelope =
context.asInstanceOf[ActorCell].currentMessage context.asInstanceOf[ActorCell].currentMessage
/**
* INTERNAL API.
*/
private[persistence] def createProcessorStash = new ProcessorStash {
var theStash = Vector.empty[Envelope]
def stash(): Unit =
theStash :+= currentEnvelope
def prepend(others: immutable.Seq[Envelope]): Unit =
others.reverseIterator.foreach(env theStash = env +: theStash)
def unstash(): Unit = try {
if (theStash.nonEmpty) {
mailbox.enqueueFirst(self, theStash.head)
theStash = theStash.tail
}
}
def unstashAll(): Unit = try {
val i = theStash.reverseIterator
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
} finally {
theStash = Vector.empty[Envelope]
}
}
} }
/** /**
* INTERNAL API. * Sent to a [[Processor]] when a journal failed to write a [[Persistent]] message. If
* not handled, an `akka.actor.ActorKilledException` is thrown by that processor.
* *
* Processor specific stash used internally to avoid interference with user stash. * @param payload payload of the persistent message.
* @param sequenceNr sequence number of the persistent message.
* @param cause failure cause.
*/ */
private[persistence] trait ProcessorStash { case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable)
/**
* Appends the current message to this stash.
*/
def stash()
/**
* Prepends `others` to this stash.
*/
def prepend(others: immutable.Seq[Envelope])
/**
* Unstashes a single message from this stash.
*/
def unstash()
/**
* Unstashes all messages from this stash.
*/
def unstashAll()
}
/** /**
* Java API. * Java API: an actor that persists (journals) messages of type [[Persistent]]. Messages of other types
* * are not persisted.
* An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted.
* *
* {{{ * {{{
* import akka.persistence.Persistent; * import akka.persistence.Persistent;
@ -468,12 +461,8 @@ private[persistence] trait ProcessorStash {
* @see [[PersistentBatch]] * @see [[PersistentBatch]]
*/ */
abstract class UntypedProcessor extends UntypedActor with Processor { abstract class UntypedProcessor extends UntypedActor with Processor {
/** /**
* Java API. * Java API. returns the current persistent message or `null` if there is none.
*
* Returns the current persistent message or `null` if there is none.
*/ */
def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null) def getCurrentPersistentMessage = currentPersistentMessage.getOrElse(null)
} }

View file

@ -1,4 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV. * Copyright (C) 2012-2013 Eligotech BV.
*/ */
@ -82,9 +83,7 @@ object SnapshotSelectionCriteria {
} }
/** /**
* Plugin API. * Plugin API: a selected snapshot matching [[SnapshotSelectionCriteria]].
*
* A selected snapshot matching [[SnapshotSelectionCriteria]].
* *
* @param metadata snapshot metadata. * @param metadata snapshot metadata.
* @param snapshot snapshot. * @param snapshot snapshot.

View file

@ -6,7 +6,7 @@ package akka.persistence.journal
import scala.concurrent.Future import scala.concurrent.Future
import akka.persistence.PersistentImpl import akka.persistence.PersistentRepr
/** /**
* Asynchronous message replay interface. * Asynchronous message replay interface.
@ -14,17 +14,15 @@ import akka.persistence.PersistentImpl
trait AsyncReplay { trait AsyncReplay {
//#journal-plugin-api //#journal-plugin-api
/** /**
* Plugin API. * Plugin API: asynchronously replays persistent messages. Implementations replay
* * a message by calling `replayCallback`. The returned future must be completed
* Asynchronously replays persistent messages. Implementations replay a message * when all messages (matching the sequence number bounds) have been replayed. The
* by calling `replayCallback`. The returned future must be completed when all * future `Long` value must be the highest stored sequence number in the journal
* messages (matching the sequence number bounds) have been replayed. The future * for the specified processor. The future must be completed with a failure if any
* `Long` value must be the highest stored sequence number in the journal for the * of the persistent messages could not be replayed.
* specified processor. The future must be completed with a failure if any of
* the persistent messages could not be replayed.
* *
* The `replayCallback` must also be called with messages that have been marked * The `replayCallback` must also be called with messages that have been marked
* as deleted. In this case a replayed message's `deleted` field must be set to * as deleted. In this case a replayed message's `deleted` method must return
* `true`. * `true`.
* *
* The channel ids of delivery confirmations that are available for a replayed * The channel ids of delivery confirmations that are available for a replayed
@ -39,6 +37,6 @@ trait AsyncReplay {
* @see [[AsyncWriteJournal]] * @see [[AsyncWriteJournal]]
* @see [[SyncWriteJournal]] * @see [[SyncWriteJournal]]
*/ */
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentImpl Unit): Future[Long] def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentRepr Unit): Future[Long]
//#journal-plugin-api //#journal-plugin-api
} }

View file

@ -1,4 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV. * Copyright (C) 2012-2013 Eligotech BV.
*/ */
@ -39,7 +40,7 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
val csdr = sender val csdr = sender
val cctr = resequencerCounter val cctr = resequencerCounter
val psdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender val psdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
def resequence(f: PersistentImpl Any) = persistentBatch.zipWithIndex.foreach { def resequence(f: PersistentRepr Any) = persistentBatch.zipWithIndex.foreach {
case (p, i) resequencer ! Desequenced(f(p), cctr + i, processor, csdr) case (p, i) resequencer ! Desequenced(f(p), cctr + i, processor, csdr)
} }
writeBatchAsync(persistentBatch.map(_.prepareWrite(psdr))) onComplete { writeBatchAsync(persistentBatch.map(_.prepareWrite(psdr))) onComplete {
@ -66,9 +67,9 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
} }
context.system.eventStream.publish(c) context.system.eventStream.publish(c)
} }
case Delete(persistent: PersistentImpl) { case d @ Delete(processorId, sequenceNr, physical) {
deleteAsync(persistent) onComplete { deleteAsync(processorId, sequenceNr, physical) onComplete {
case Success(_) // TODO: publish success to event stream case Success(_) context.system.eventStream.publish(d)
case Failure(e) // TODO: publish failure to event stream case Failure(e) // TODO: publish failure to event stream
} }
} }
@ -80,31 +81,26 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
//#journal-plugin-api //#journal-plugin-api
/** /**
* Plugin API. * Plugin API: asynchronously writes a `persistent` message to the journal.
*
* Asynchronously writes a `persistent` message to the journal.
*/ */
def writeAsync(persistent: PersistentImpl): Future[Unit] def writeAsync(persistent: PersistentRepr): Future[Unit]
/** /**
* Plugin API. * Plugin API: asynchronously writes a batch of persistent messages to the journal.
* * The batch write must be atomic i.e. either all persistent messages in the batch
* Asynchronously writes a batch of persistent messages to the journal. The batch write * are written or none.
* must be atomic i.e. either all persistent messages in the batch are written or none.
*/ */
def writeBatchAsync(persistentBatch: immutable.Seq[PersistentImpl]): Future[Unit] def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit]
/** /**
* Plugin API. * Plugin API: asynchronously deletes a persistent message. If `physical` is set to
* * `false`, the persistent message is marked as deleted, otherwise it is physically
* Asynchronously marks a `persistent` message as deleted. * deleted.
*/ */
def deleteAsync(persistent: PersistentImpl): Future[Unit] def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit]
/** /**
* Plugin API. * Plugin API: asynchronously writes a delivery confirmation to the journal.
*
* Asynchronously writes a delivery confirmation to the journal.
*/ */
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit]
//#journal-plugin-api //#journal-plugin-api

View file

@ -1,4 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV. * Copyright (C) 2012-2013 Eligotech BV.
*/ */
@ -48,8 +49,9 @@ trait SyncWriteJournal extends Actor with AsyncReplay {
confirm(processorId, sequenceNr, channelId) confirm(processorId, sequenceNr, channelId)
context.system.eventStream.publish(c) // TODO: turn off by default and allow to turn on by configuration context.system.eventStream.publish(c) // TODO: turn off by default and allow to turn on by configuration
} }
case Delete(persistent: PersistentImpl) { case d @ Delete(processorId, sequenceNr, physical) {
delete(persistent) delete(processorId, sequenceNr, physical)
context.system.eventStream.publish(d) // TODO: turn off by default and allow to turn on by configuration
} }
case Loop(message, processor) { case Loop(message, processor) {
processor forward LoopSuccess(message) processor forward LoopSuccess(message)
@ -58,31 +60,26 @@ trait SyncWriteJournal extends Actor with AsyncReplay {
//#journal-plugin-api //#journal-plugin-api
/** /**
* Plugin API. * Plugin API: synchronously writes a `persistent` message to the journal.
*
* Synchronously writes a `persistent` message to the journal.
*/ */
def write(persistent: PersistentImpl): Unit def write(persistent: PersistentRepr): Unit
/** /**
* Plugin API. * Plugin API: synchronously writes a batch of persistent messages to the journal.
* * The batch write must be atomic i.e. either all persistent messages in the batch
* Synchronously writes a batch of persistent messages to the journal. The batch write * are written or none.
* must be atomic i.e. either all persistent messages in the batch are written or none.
*/ */
def writeBatch(persistentBatch: immutable.Seq[PersistentImpl]): Unit def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]): Unit
/** /**
* Plugin API. * Plugin API: synchronously deletes a persistent message. If `physical` is set to
* * `false`, the persistent message is marked as deleted, otherwise it is physically
* Synchronously marks a `persistent` message as deleted. * deleted.
*/ */
def delete(persistent: PersistentImpl): Unit def delete(processorId: String, sequenceNr: Long, physical: Boolean): Unit
/** /**
* Plugin API. * Plugin API: synchronously writes a delivery confirmation to the journal.
*
* Synchronously writes a delivery confirmation to the journal.
*/ */
def confirm(processorId: String, sequenceNr: Long, channelId: String): Unit def confirm(processorId: String, sequenceNr: Long, channelId: String): Unit
//#journal-plugin-api //#journal-plugin-api

View file

@ -27,19 +27,19 @@ private[persistence] class InmemJournal extends AsyncWriteJournal {
import InmemStore._ import InmemStore._
def writeAsync(persistent: PersistentImpl): Future[Unit] = def writeAsync(persistent: PersistentRepr): Future[Unit] =
(store ? Write(persistent)).mapTo[Unit] (store ? Write(persistent)).mapTo[Unit]
def writeBatchAsync(persistentBatch: immutable.Seq[PersistentImpl]): Future[Unit] = def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] =
(store ? WriteBatch(persistentBatch)).mapTo[Unit] (store ? WriteBatch(persistentBatch)).mapTo[Unit]
def deleteAsync(persistent: PersistentImpl): Future[Unit] = def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean): Future[Unit] =
(store ? Delete(persistent)).mapTo[Unit] (store ? Delete(processorId, sequenceNr, physical)).mapTo[Unit]
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] =
(store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit] (store ? Confirm(processorId, sequenceNr, channelId)).mapTo[Unit]
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) Unit): Future[Long] = def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) Unit): Future[Long] =
(store ? Replay(processorId, fromSequenceNr, toSequenceNr, replayCallback)).mapTo[Long] (store ? Replay(processorId, fromSequenceNr, toSequenceNr, replayCallback)).mapTo[Long]
} }
@ -47,7 +47,7 @@ private[persistence] class InmemStore extends Actor {
import InmemStore._ import InmemStore._
// processor id => persistent message // processor id => persistent message
var messages = Map.empty[String, Vector[PersistentImpl]] var messages = Map.empty[String, Vector[PersistentRepr]]
def receive = { def receive = {
case Write(p) case Write(p)
@ -56,11 +56,14 @@ private[persistence] class InmemStore extends Actor {
case WriteBatch(pb) case WriteBatch(pb)
pb.foreach(add) pb.foreach(add)
success() success()
case Delete(p) case Delete(pid, snr, false)
update(p.processorId, p.sequenceNr)(_.copy(deleted = true)) update(pid, snr)(_.update(deleted = true))
success()
case Delete(pid, snr, true)
delete(pid, snr)
success() success()
case Confirm(pid, snr, cid) case Confirm(pid, snr, cid)
update(pid, snr)(p p.copy(confirms = cid +: p.confirms)) update(pid, snr)(p p.update(confirms = cid +: p.confirms))
success() success()
case Replay(pid, fromSnr, toSnr, callback) { case Replay(pid, fromSnr, toSnr, callback) {
for { for {
@ -76,16 +79,21 @@ private[persistence] class InmemStore extends Actor {
private def success(reply: Any = ()) = private def success(reply: Any = ()) =
sender ! reply sender ! reply
private def add(p: PersistentImpl) = messages = messages + (messages.get(p.processorId) match { private def add(p: PersistentRepr) = messages = messages + (messages.get(p.processorId) match {
case Some(ms) p.processorId -> (ms :+ p) case Some(ms) p.processorId -> (ms :+ p)
case None p.processorId -> Vector(p) case None p.processorId -> Vector(p)
}) })
private def update(pid: String, snr: Long)(f: PersistentImpl PersistentImpl) = messages = messages.get(pid) match { private def update(pid: String, snr: Long)(f: PersistentRepr PersistentRepr) = messages = messages.get(pid) match {
case Some(ms) messages + (pid -> ms.map(sp if (sp.sequenceNr == snr) f(sp) else sp)) case Some(ms) messages + (pid -> ms.map(sp if (sp.sequenceNr == snr) f(sp) else sp))
case None messages case None messages
} }
private def delete(pid: String, snr: Long) = messages = messages.get(pid) match {
case Some(ms) messages + (pid -> ms.filterNot(_.sequenceNr == snr))
case None messages
}
private def maxSequenceNr(pid: String): Long = { private def maxSequenceNr(pid: String): Long = {
val snro = for { val snro = for {
ms messages.get(pid) ms messages.get(pid)
@ -96,9 +104,9 @@ private[persistence] class InmemStore extends Actor {
} }
private[persistence] object InmemStore { private[persistence] object InmemStore {
case class Write(p: PersistentImpl) case class Write(p: PersistentRepr)
case class WriteBatch(pb: Seq[PersistentImpl]) case class WriteBatch(pb: Seq[PersistentRepr])
case class Delete(p: PersistentImpl) case class Delete(processorId: String, sequenceNr: Long, physical: Boolean)
case class Confirm(processorId: String, sequenceNr: Long, channelId: String) case class Confirm(processorId: String, sequenceNr: Long, channelId: String)
case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentImpl) Unit) case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentRepr) Unit)
} }

View file

@ -11,18 +11,16 @@ import scala.concurrent.Future
import akka.actor.Actor import akka.actor.Actor
import akka.japi.Procedure import akka.japi.Procedure
import akka.persistence.journal.{ AsyncReplay SAsyncReplay } import akka.persistence.journal.{ AsyncReplay SAsyncReplay }
import akka.persistence.PersistentImpl import akka.persistence.PersistentRepr
/** /**
* Java API. * Java API: asynchronous message replay interface.
*
* Asynchronous message replay interface.
*/ */
abstract class AsyncReplay extends SAsyncReplay with AsyncReplayPlugin { this: Actor abstract class AsyncReplay extends SAsyncReplay with AsyncReplayPlugin { this: Actor
import context.dispatcher import context.dispatcher
final def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) Unit) = final def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) Unit) =
doReplayAsync(processorId, fromSequenceNr, toSequenceNr, new Procedure[PersistentImpl] { doReplayAsync(processorId, fromSequenceNr, toSequenceNr, new Procedure[PersistentRepr] {
def apply(p: PersistentImpl) = replayCallback(p) def apply(p: PersistentRepr) = replayCallback(p)
}).map(_.longValue) }).map(_.longValue)
} }

View file

@ -8,24 +8,22 @@ import scala.collection.immutable
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import akka.persistence.journal.{ AsyncWriteJournal SAsyncWriteJournal } import akka.persistence.journal.{ AsyncWriteJournal SAsyncWriteJournal }
import akka.persistence.PersistentImpl import akka.persistence.PersistentRepr
/** /**
* Java API. * Java API: abstract journal, optimized for asynchronous, non-blocking writes.
*
* Abstract journal, optimized for asynchronous, non-blocking writes.
*/ */
abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal with AsyncWritePlugin { abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal with AsyncWritePlugin {
import context.dispatcher import context.dispatcher
final def writeAsync(persistent: PersistentImpl) = final def writeAsync(persistent: PersistentRepr) =
doWriteAsync(persistent).map(Unit.unbox) doWriteAsync(persistent).map(Unit.unbox)
final def writeBatchAsync(persistentBatch: immutable.Seq[PersistentImpl]) = final def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]) =
doWriteBatchAsync(persistentBatch.asJava).map(Unit.unbox) doWriteBatchAsync(persistentBatch.asJava).map(Unit.unbox)
final def deleteAsync(persistent: PersistentImpl) = final def deleteAsync(processorId: String, sequenceNr: Long, physical: Boolean) =
doDeleteAsync(persistent).map(Unit.unbox) doDeleteAsync(processorId, sequenceNr, physical).map(Unit.unbox)
final def confirmAsync(processorId: String, sequenceNr: Long, channelId: String) = final def confirmAsync(processorId: String, sequenceNr: Long, channelId: String) =
doConfirmAsync(processorId, sequenceNr, channelId).map(Unit.unbox) doConfirmAsync(processorId, sequenceNr, channelId).map(Unit.unbox)

View file

@ -8,22 +8,20 @@ import scala.collection.immutable
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import akka.persistence.journal.{ SyncWriteJournal SSyncWriteJournal } import akka.persistence.journal.{ SyncWriteJournal SSyncWriteJournal }
import akka.persistence.PersistentImpl import akka.persistence.PersistentRepr
/** /**
* Java API. * Java API: abstract journal, optimized for synchronous writes.
*
* Abstract journal, optimized for synchronous writes.
*/ */
abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal with SyncWritePlugin { abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal with SyncWritePlugin {
final def write(persistent: PersistentImpl) = final def write(persistent: PersistentRepr) =
doWrite(persistent) doWrite(persistent)
final def writeBatch(persistentBatch: immutable.Seq[PersistentImpl]) = final def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) =
doWriteBatch(persistentBatch.asJava) doWriteBatch(persistentBatch.asJava)
final def delete(persistent: PersistentImpl) = final def delete(processorId: String, sequenceNr: Long, physical: Boolean) =
doDelete(persistent) doDelete(processorId, sequenceNr, physical)
final def confirm(processorId: String, sequenceNr: Long, channelId: String) = final def confirm(processorId: String, sequenceNr: Long, channelId: String) =
doConfirm(processorId, sequenceNr, channelId) doConfirm(processorId, sequenceNr, channelId)

View file

@ -1,4 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV. * Copyright (C) 2012-2013 Eligotech BV.
*/ */
@ -38,14 +39,18 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap
import Key._ import Key._
def write(persistent: PersistentImpl) = def write(persistent: PersistentRepr) =
withBatch(batch addToBatch(persistent, batch)) withBatch(batch addToBatch(persistent, batch))
def writeBatch(persistentBatch: immutable.Seq[PersistentImpl]) = def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) =
withBatch(batch persistentBatch.foreach(persistent addToBatch(persistent, batch))) withBatch(batch persistentBatch.foreach(persistent addToBatch(persistent, batch)))
def delete(persistent: PersistentImpl) { def delete(processorId: String, sequenceNr: Long, physical: Boolean) {
leveldb.put(keyToBytes(deletionKey(numericId(persistent.processorId), persistent.sequenceNr)), Array.empty[Byte]) if (physical)
// TODO: delete confirmations and deletion markers, if any.
leveldb.delete(keyToBytes(Key(numericId(processorId), sequenceNr, 0)))
else
leveldb.put(keyToBytes(deletionKey(numericId(processorId), sequenceNr)), Array.empty[Byte])
} }
def confirm(processorId: String, sequenceNr: Long, channelId: String) { def confirm(processorId: String, sequenceNr: Long, channelId: String) {
@ -55,10 +60,10 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap
def leveldbSnapshot = leveldbReadOptions.snapshot(leveldb.getSnapshot) def leveldbSnapshot = leveldbReadOptions.snapshot(leveldb.getSnapshot)
def leveldbIterator = leveldb.iterator(leveldbSnapshot) def leveldbIterator = leveldb.iterator(leveldbSnapshot)
def persistentToBytes(p: PersistentImpl): Array[Byte] = serialization.serialize(p).get def persistentToBytes(p: PersistentRepr): Array[Byte] = serialization.serialize(p).get
def persistentFromBytes(a: Array[Byte]): PersistentImpl = serialization.deserialize(a, classOf[PersistentImpl]).get def persistentFromBytes(a: Array[Byte]): PersistentRepr = serialization.deserialize(a, classOf[PersistentRepr]).get
private def addToBatch(persistent: PersistentImpl, batch: WriteBatch): Unit = { private def addToBatch(persistent: PersistentRepr, batch: WriteBatch): Unit = {
val nid = numericId(persistent.processorId) val nid = numericId(persistent.processorId)
batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr)) batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr))
batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent)) batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent))

View file

@ -1,4 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV. * Copyright (C) 2012-2013 Eligotech BV.
*/ */

View file

@ -1,4 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV. * Copyright (C) 2012-2013 Eligotech BV.
*/ */
@ -18,14 +19,14 @@ private[persistence] trait LeveldbReplay extends AsyncReplay { this: LeveldbJour
private val replayDispatcherId = context.system.settings.config.getString("akka.persistence.journal.leveldb.replay-dispatcher") private val replayDispatcherId = context.system.settings.config.getString("akka.persistence.journal.leveldb.replay-dispatcher")
private val replayDispatcher = context.system.dispatchers.lookup(replayDispatcherId) private val replayDispatcher = context.system.dispatchers.lookup(replayDispatcherId)
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentImpl Unit): Future[Long] = def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentRepr Unit): Future[Long] =
Future(replay(numericId(processorId), fromSequenceNr: Long, toSequenceNr)(replayCallback))(replayDispatcher) Future(replay(numericId(processorId), fromSequenceNr: Long, toSequenceNr)(replayCallback))(replayDispatcher)
private def replay(processorId: Int, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentImpl Unit): Long = { private def replay(processorId: Int, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: PersistentRepr Unit): Long = {
val iter = leveldbIterator val iter = leveldbIterator
@scala.annotation.tailrec @scala.annotation.tailrec
def go(key: Key, replayCallback: PersistentImpl Unit) { def go(key: Key, replayCallback: PersistentRepr Unit) {
if (iter.hasNext) { if (iter.hasNext) {
val nextEntry = iter.next() val nextEntry = iter.next()
val nextKey = keyFromBytes(nextEntry.getKey) val nextKey = keyFromBytes(nextEntry.getKey)
@ -38,7 +39,7 @@ private[persistence] trait LeveldbReplay extends AsyncReplay { this: LeveldbJour
val msg = persistentFromBytes(nextEntry.getValue) val msg = persistentFromBytes(nextEntry.getValue)
val del = deletion(nextKey) val del = deletion(nextKey)
val cnf = confirms(nextKey, Nil) val cnf = confirms(nextKey, Nil)
replayCallback(msg.copy(confirms = cnf, deleted = del)) replayCallback(msg.update(confirms = cnf, deleted = del))
go(nextKey, replayCallback) go(nextKey, replayCallback)
} }
} }

View file

@ -1,4 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV. * Copyright (C) 2012-2013 Eligotech BV.
*/ */

View file

@ -12,42 +12,55 @@ import akka.actor.ExtendedActorSystem
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.persistence._ import akka.persistence._
import akka.persistence.serialization.MessageFormats._ import akka.persistence.serialization.MessageFormats._
import akka.persistence.serialization.MessageFormats.DeliverMessage.ResolveStrategy
import akka.serialization._ import akka.serialization._
/** /**
* Protobuf serializer for [[Persistent]] and `Confirm` messages. * Marker trait for all protobuf-serializable messages in `akka.persistence`.
*/
trait Message extends Serializable
/**
* Protobuf serializer for [[PersistentBatch]], [[PersistentRepr]] and [[Deliver]] messages.
*/ */
class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
import PersistentImpl.Undefined import PersistentRepr.Undefined
val PersistentBatchClass = classOf[PersistentBatch] val PersistentBatchClass = classOf[PersistentBatch]
val PersistentClass = classOf[PersistentImpl] val PersistentReprClass = classOf[PersistentRepr]
val PersistentImplClass = classOf[PersistentImpl]
val ConfirmablePersistentImplClass = classOf[ConfirmablePersistentImpl]
val ConfirmClass = classOf[Confirm] val ConfirmClass = classOf[Confirm]
val DeliverClass = classOf[Deliver]
def identifier: Int = 7 def identifier: Int = 7
def includeManifest: Boolean = true def includeManifest: Boolean = true
/** /**
* Serializes [[PersistentBatch]] and [[Persistent]]. Delegates serialization of a * Serializes [[PersistentBatch]], [[PersistentRepr]] and [[Deliver]] messages. Delegates
* persistent message's payload to a matching `akka.serialization.Serializer`. * serialization of a persistent message's payload to a matching `akka.serialization.Serializer`.
*/ */
def toBinary(o: AnyRef): Array[Byte] = o match { def toBinary(o: AnyRef): Array[Byte] = o match {
case b: PersistentBatch persistentMessageBatchBuilder(b).build().toByteArray case b: PersistentBatch persistentMessageBatchBuilder(b).build().toByteArray
case p: PersistentImpl persistentMessageBuilder(p).build().toByteArray case p: PersistentRepr persistentMessageBuilder(p).build().toByteArray
case c: Confirm confirmMessageBuilder(c).build().toByteArray case c: Confirm confirmMessageBuilder(c).build().toByteArray
case d: Deliver deliverMessageBuilder(d).build.toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
} }
/** /**
* Deserializes [[PersistentBatch]] and [[Persistent]]. Delegates deserialization of a * Deserializes [[PersistentBatch]], [[PersistentRepr]] and [[Deliver]] messages. Delegates
* persistent message's payload to a matching `akka.serialization.Serializer`. * deserialization of a persistent message's payload to a matching `akka.serialization.Serializer`.
*/ */
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match { def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): Message = manifest match {
case None persistent(PersistentMessage.parseFrom(bytes)) case None persistent(PersistentMessage.parseFrom(bytes))
case Some(c) c match { case Some(c) c match {
case PersistentImplClass persistent(PersistentMessage.parseFrom(bytes))
case ConfirmablePersistentImplClass persistent(PersistentMessage.parseFrom(bytes))
case PersistentReprClass persistent(PersistentMessage.parseFrom(bytes))
case PersistentBatchClass persistentBatch(PersistentMessageBatch.parseFrom(bytes)) case PersistentBatchClass persistentBatch(PersistentMessageBatch.parseFrom(bytes))
case PersistentClass persistent(PersistentMessage.parseFrom(bytes))
case ConfirmClass confirm(ConfirmMessage.parseFrom(bytes)) case ConfirmClass confirm(ConfirmMessage.parseFrom(bytes))
case DeliverClass deliver(DeliverMessage.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}") case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
} }
} }
@ -56,17 +69,27 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
// toBinary helpers // toBinary helpers
// //
private def deliverMessageBuilder(deliver: Deliver) = {
val builder = DeliverMessage.newBuilder
builder.setPersistent(persistentMessageBuilder(deliver.persistent.asInstanceOf[PersistentRepr]))
builder.setDestination(Serialization.serializedActorPath(deliver.destination))
deliver.resolve match {
case Resolve.Off builder.setResolve(DeliverMessage.ResolveStrategy.Off)
case Resolve.Sender builder.setResolve(DeliverMessage.ResolveStrategy.Sender)
case Resolve.Destination builder.setResolve(DeliverMessage.ResolveStrategy.Destination)
}
}
private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = { private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = {
val builder = PersistentMessageBatch.newBuilder val builder = PersistentMessageBatch.newBuilder
persistentBatch.persistentImplList.foreach(p builder.addBatch(persistentMessageBuilder(p))) persistentBatch.persistentReprList.foreach(p builder.addBatch(persistentMessageBuilder(p)))
builder builder
} }
private def persistentMessageBuilder(persistent: PersistentImpl) = { private def persistentMessageBuilder(persistent: PersistentRepr) = {
val builder = PersistentMessage.newBuilder val builder = PersistentMessage.newBuilder
if (persistent.processorId != Undefined) builder.setProcessorId(persistent.processorId) if (persistent.processorId != Undefined) builder.setProcessorId(persistent.processorId)
if (persistent.channelId != Undefined) builder.setChannelId(persistent.channelId)
if (persistent.confirmMessage != null) builder.setConfirmMessage(confirmMessageBuilder(persistent.confirmMessage)) if (persistent.confirmMessage != null) builder.setConfirmMessage(confirmMessageBuilder(persistent.confirmMessage))
if (persistent.confirmTarget != null) builder.setConfirmTarget(Serialization.serializedActorPath(persistent.confirmTarget)) if (persistent.confirmTarget != null) builder.setConfirmTarget(Serialization.serializedActorPath(persistent.confirmTarget))
if (persistent.sender != null) builder.setSender(Serialization.serializedActorPath(persistent.sender)) if (persistent.sender != null) builder.setSender(Serialization.serializedActorPath(persistent.sender))
@ -77,6 +100,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
builder.setSequenceNr(persistent.sequenceNr) builder.setSequenceNr(persistent.sequenceNr)
builder.setDeleted(persistent.deleted) builder.setDeleted(persistent.deleted)
builder.setResolved(persistent.resolved) builder.setResolved(persistent.resolved)
builder.setConfirmable(persistent.confirmable)
builder builder
} }
@ -102,18 +126,29 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
// fromBinary helpers // fromBinary helpers
// //
private def deliver(deliverMessage: DeliverMessage): Deliver = {
Deliver(
persistent(deliverMessage.getPersistent),
system.provider.resolveActorRef(deliverMessage.getDestination),
deliverMessage.getResolve match {
case ResolveStrategy.Off Resolve.Off
case ResolveStrategy.Sender Resolve.Sender
case ResolveStrategy.Destination Resolve.Destination
})
}
private def persistentBatch(persistentMessageBatch: PersistentMessageBatch): PersistentBatch = private def persistentBatch(persistentMessageBatch: PersistentMessageBatch): PersistentBatch =
PersistentBatch(immutableSeq(persistentMessageBatch.getBatchList).map(persistent)) PersistentBatch(immutableSeq(persistentMessageBatch.getBatchList).map(persistent))
private def persistent(persistentMessage: PersistentMessage): PersistentImpl = { private def persistent(persistentMessage: PersistentMessage): PersistentRepr = {
PersistentImpl( PersistentRepr(
payload(persistentMessage.getPayload), payload(persistentMessage.getPayload),
persistentMessage.getSequenceNr, persistentMessage.getSequenceNr,
if (persistentMessage.hasProcessorId) persistentMessage.getProcessorId else Undefined, if (persistentMessage.hasProcessorId) persistentMessage.getProcessorId else Undefined,
if (persistentMessage.hasChannelId) persistentMessage.getChannelId else Undefined,
persistentMessage.getDeleted, persistentMessage.getDeleted,
persistentMessage.getResolved, persistentMessage.getResolved,
immutableSeq(persistentMessage.getConfirmsList), immutableSeq(persistentMessage.getConfirmsList),
persistentMessage.getConfirmable,
if (persistentMessage.hasConfirmMessage) confirm(persistentMessage.getConfirmMessage) else null, if (persistentMessage.hasConfirmMessage) confirm(persistentMessage.getConfirmMessage) else null,
if (persistentMessage.hasConfirmTarget) system.provider.resolveActorRef(persistentMessage.getConfirmTarget) else null, if (persistentMessage.hasConfirmTarget) system.provider.resolveActorRef(persistentMessage.getConfirmTarget) else null,
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else null) if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else null)

View file

@ -1,4 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV. * Copyright (C) 2012-2013 Eligotech BV.
*/ */

View file

@ -1,4 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV. * Copyright (C) 2012-2013 Eligotech BV.
*/ */
@ -47,9 +48,7 @@ trait SnapshotStore extends Actor {
//#snapshot-store-plugin-api //#snapshot-store-plugin-api
/** /**
* Plugin API. * Plugin API: asynchronously loads a snapshot.
*
* Asynchronously loads a snapshot.
* *
* @param processorId processor id. * @param processorId processor id.
* @param criteria selection criteria for loading. * @param criteria selection criteria for loading.
@ -57,9 +56,7 @@ trait SnapshotStore extends Actor {
def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
/** /**
* Plugin API. * Plugin API: asynchronously saves a snapshot.
*
* Asynchronously saves a snapshot.
* *
* @param metadata snapshot metadata. * @param metadata snapshot metadata.
* @param snapshot snapshot. * @param snapshot snapshot.
@ -67,18 +64,14 @@ trait SnapshotStore extends Actor {
def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit]
/** /**
* Plugin API. * Plugin API: called after successful saving of a snapshot.
*
* Called after successful saving of a snapshot.
* *
* @param metadata snapshot metadata. * @param metadata snapshot metadata.
*/ */
def saved(metadata: SnapshotMetadata) def saved(metadata: SnapshotMetadata)
/** /**
* Plugin API. * Plugin API: deletes the snapshot identified by `metadata`.
*
* Deletes the snapshot identified by `metadata`.
* *
* @param metadata snapshot metadata. * @param metadata snapshot metadata.
*/ */

View file

@ -11,9 +11,7 @@ import akka.persistence._
import akka.persistence.snapshot.{ SnapshotStore SSnapshotStore } import akka.persistence.snapshot.{ SnapshotStore SSnapshotStore }
/** /**
* Java API. * Java API: abstract snapshot store.
*
* Abstract snapshot store.
*/ */
abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin {
import context.dispatcher import context.dispatcher

View file

@ -1,4 +1,5 @@
/** /**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV. * Copyright (C) 2012-2013 Eligotech BV.
*/ */

View file

@ -10,9 +10,9 @@ import akka.actor._
import akka.testkit._ import akka.testkit._
object ChannelSpec { object ChannelSpec {
class TestProcessor(name: String) extends NamedProcessor(name) { class TestProcessor(name: String, channelProps: Props) extends NamedProcessor(name) {
val destination = context.actorOf(Props[TestDestination]) val destination = context.actorOf(Props[TestDestination])
val channel = context.actorOf(Channel.props("channel")) val channel = context.actorOf(channelProps)
def receive = { def receive = {
case m @ Persistent(s: String, _) if s.startsWith("a") { case m @ Persistent(s: String, _) if s.startsWith("a") {
@ -38,6 +38,14 @@ object ChannelSpec {
case Persistent(payload, _) testActor ! payload case Persistent(payload, _) testActor ! payload
} }
} }
class TestDestinationProcessor(name: String) extends NamedProcessor(name) {
def receive = {
case cp @ ConfirmablePersistent("a", _) cp.confirm()
case cp @ ConfirmablePersistent("b", _) cp.confirm()
case cp @ ConfirmablePersistent("boom", _) if (recoveryFinished) throw new TestException("boom")
}
}
} }
abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
@ -50,46 +58,78 @@ abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with Persist
val forwardProbe = TestProbe() val forwardProbe = TestProbe()
val replyProbe = TestProbe() val replyProbe = TestProbe()
val processor = system.actorOf(Props(classOf[TestProcessor], name)) val processor = system.actorOf(Props(classOf[TestProcessor], name, channelProps(s"${name}-channel")))
system.eventStream.subscribe(confirmProbe.ref, classOf[Confirm]) subscribeToConfirmation(confirmProbe)
processor tell (Persistent("a1"), forwardProbe.ref) processor tell (Persistent("a1"), forwardProbe.ref)
processor tell (Persistent("b1"), replyProbe.ref) processor tell (Persistent("b1"), replyProbe.ref)
forwardProbe.expectMsgPF() { case m @ Persistent("fw: a1", _) m.confirm() } forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a1", _) m.confirm() }
replyProbe.expectMsgPF() { case m @ Persistent("re: b1", _) m.confirm() } replyProbe.expectMsgPF() { case m @ ConfirmablePersistent("re: b1", _) m.confirm() }
// wait for confirmations to be stored by journal (needed awaitConfirmation(confirmProbe)
// for replay so that channels can drop confirmed messages) awaitConfirmation(confirmProbe)
confirmProbe.expectMsgType[Confirm]
confirmProbe.expectMsgType[Confirm]
} }
def actorRefFor(topLevelName: String) = { def actorRefFor(topLevelName: String) =
extension.system.provider.resolveActorRef(RootActorPath(Address("akka", system.name)) / "user" / topLevelName) extension.system.provider.resolveActorRef(RootActorPath(Address("akka", system.name)) / "user" / topLevelName)
}
def channelProps(channelId: String): Props =
Channel.props(channelId)
def subscribeToConfirmation(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[Confirm])
def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[Confirm]
"A channel" must { "A channel" must {
"forward un-confirmed messages to destination" in { "forward new messages to destination" in {
val processor = system.actorOf(Props(classOf[TestProcessor], name)) val processor = system.actorOf(Props(classOf[TestProcessor], name, channelProps(s"${name}-channel")))
processor ! Persistent("a2") processor ! Persistent("a2")
expectMsgPF() { case m @ Persistent("fw: a2", _) m.confirm() } expectMsgPF() { case m @ ConfirmablePersistent("fw: a2", _) m.confirm() }
} }
"reply un-confirmed messages to senders" in { "reply new messages to senders" in {
val processor = system.actorOf(Props(classOf[TestProcessor], name)) val processor = system.actorOf(Props(classOf[TestProcessor], name, channelProps(s"${name}-channel")))
processor ! Persistent("b2") processor ! Persistent("b2")
expectMsgPF() { case m @ Persistent("re: b2", _) m.confirm() } expectMsgPF() { case m @ ConfirmablePersistent("re: b2", _) m.confirm() }
}
"forward un-confirmed stored messages to destination during recovery" in {
val confirmProbe = TestProbe()
val forwardProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
val processor1 = system.actorOf(Props(classOf[TestProcessor], name, channelProps(s"${name}-channel")))
processor1 tell (Persistent("a1"), forwardProbe.ref)
processor1 tell (Persistent("a2"), forwardProbe.ref)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a1", _) /* no confirmation */ }
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a2", _) m.confirm() }
awaitConfirmation(confirmProbe)
val processor2 = system.actorOf(Props(classOf[TestProcessor], name, channelProps(s"${name}-channel")))
processor2 tell (Persistent("a3"), forwardProbe.ref)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a1", _) m.confirm() } // sender still valid, no need to resolve
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("fw: a3", _) m.confirm() }
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
} }
"must resolve sender references and preserve message order" in { "must resolve sender references and preserve message order" in {
val channel = system.actorOf(Channel.props()) val channel = system.actorOf(channelProps("channel-1"))
val destination = system.actorOf(Props[TestDestination]) val destination = system.actorOf(Props[TestDestination])
val empty = actorRefFor("testSender") // will be an EmptyLocalActorRef val empty = actorRefFor("testSender") // will be an EmptyLocalActorRef
val sender = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender") val sender = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender")
// replayed message (resolved = false) and invalid sender reference // replayed message (resolved = false) and invalid sender reference
channel tell (Deliver(PersistentImpl("a", resolved = false), destination, Resolve.Sender), empty) channel tell (Deliver(PersistentRepr("a", resolved = false), destination, Resolve.Sender), empty)
// new messages (resolved = true) and valid sender references // new messages (resolved = true) and valid sender references
channel tell (Deliver(Persistent("b"), destination), sender) channel tell (Deliver(Persistent("b"), destination), sender)
@ -100,13 +140,13 @@ abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with Persist
expectMsg("c") expectMsg("c")
} }
"must resolve destination references and preserve message order" in { "must resolve destination references and preserve message order" in {
val channel = system.actorOf(Channel.props()) val channel = system.actorOf(channelProps("channel-2"))
val empty = actorRefFor("testDestination") // will be an EmptyLocalActorRef val empty = actorRefFor("testDestination") // will be an EmptyLocalActorRef
val destination = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination") val destination = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination")
// replayed message (resolved = false) and invalid destination reference // replayed message (resolved = false) and invalid destination reference
channel ! Deliver(PersistentImpl("a", resolved = false), empty, Resolve.Destination) channel ! Deliver(PersistentRepr("a", resolved = false), empty, Resolve.Destination)
// new messages (resolved = true) and valid destination references // new messages (resolved = true) and valid destination references
channel ! Deliver(Persistent("b"), destination) channel ! Deliver(Persistent("b"), destination)
@ -116,8 +156,130 @@ abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with Persist
expectMsg("b") expectMsg("b")
expectMsg("c") expectMsg("c")
} }
"support processors as destination" in {
val channel = system.actorOf(channelProps(s"${name}-channel-new"))
val destination = system.actorOf(Props(classOf[TestDestinationProcessor], s"${name}-new"))
val confirmProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
channel ! Deliver(Persistent("a"), destination)
awaitConfirmation(confirmProbe)
}
"support processors as destination that may fail" in {
val channel = system.actorOf(channelProps(s"${name}-channel-new"))
val destination = system.actorOf(Props(classOf[TestDestinationProcessor], s"${name}-new"))
val confirmProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
channel ! Deliver(Persistent("a"), destination)
channel ! Deliver(Persistent("boom"), destination)
channel ! Deliver(Persistent("b"), destination)
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
}
"accept confirmable persistent messages for delivery" in {
val channel = system.actorOf(channelProps(s"${name}-channel-new"))
val destination = system.actorOf(Props[TestDestination])
val confirmProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
channel ! Deliver(PersistentRepr("a", confirmable = true), destination)
expectMsgPF() { case m @ ConfirmablePersistent("a", _) m.confirm() }
awaitConfirmation(confirmProbe)
}
}
}
abstract class PersistentChannelSpec(config: Config) extends ChannelSpec(config) {
override def channelProps(channelId: String): Props =
PersistentChannel.props(channelId)
override def subscribeToConfirmation(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[JournalProtocol.Delete])
override def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[JournalProtocol.Delete]
"A persistent channel" must {
"support disabling and re-enabling delivery" in {
val channel = system.actorOf(channelProps(s"${name}-channel"))
val confirmProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
channel ! Deliver(Persistent("a"), testActor)
expectMsgPF() { case m @ ConfirmablePersistent("a", _) m.confirm() }
awaitConfirmation(confirmProbe)
channel ! DisableDelivery
channel ! Deliver(Persistent("b"), testActor)
channel ! EnableDelivery
channel ! Deliver(Persistent("c"), testActor)
expectMsgPF() { case m @ ConfirmablePersistent("b", _) m.confirm() }
expectMsgPF() { case m @ ConfirmablePersistent("c", _) m.confirm() }
}
"support Persistent replies to Deliver senders" in {
val channel = system.actorOf(PersistentChannel.props(s"${name}-channel-new", true))
channel ! Deliver(Persistent("a"), system.deadLetters)
expectMsgPF() { case Persistent("a", 1) }
channel ! Deliver(PersistentRepr("b", sequenceNr = 13), system.deadLetters)
expectMsgPF() { case Persistent("b", 13) }
}
"must not modify certain persistent message field" in {
val channel = system.actorOf(channelProps(s"${name}-channel-new"))
val persistent1 = PersistentRepr(payload = "a", processorId = "p1", confirms = List("c1", "c2"), sender = channel, sequenceNr = 13)
val persistent2 = PersistentRepr(payload = "b", processorId = "p1", confirms = List("c1", "c2"), sender = channel)
channel ! Deliver(persistent1, testActor)
channel ! Deliver(persistent2, testActor)
expectMsgPF() { case ConfirmablePersistentImpl("a", 13, "p1", _, _, Seq("c1", "c2"), _, _, channel) }
expectMsgPF() { case ConfirmablePersistentImpl("b", 2, "p1", _, _, Seq("c1", "c2"), _, _, channel) }
}
}
"A persistent channel" when {
"used standalone" must {
"redeliver un-confirmed stored messages during recovery" in {
val confirmProbe = TestProbe()
val forwardProbe = TestProbe()
subscribeToConfirmation(confirmProbe)
val channel1 = system.actorOf(channelProps(s"${name}-channel"))
channel1 tell (Deliver(Persistent("a1"), forwardProbe.ref), null)
channel1 tell (Deliver(Persistent("a2"), forwardProbe.ref), null)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a1", _) /* no confirmation */ }
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a2", _) m.confirm() }
awaitConfirmation(confirmProbe)
val channel2 = system.actorOf(channelProps(s"${name}-channel"))
channel2 tell (Deliver(Persistent("a3"), forwardProbe.ref), null)
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a1", _) m.confirm() } // sender still valid, no need to resolve
forwardProbe.expectMsgPF() { case m @ ConfirmablePersistent("a3", _) m.confirm() }
awaitConfirmation(confirmProbe)
awaitConfirmation(confirmProbe)
}
}
} }
} }
class LeveldbChannelSpec extends ChannelSpec(PersistenceSpec.config("leveldb", "channel")) class LeveldbChannelSpec extends ChannelSpec(PersistenceSpec.config("leveldb", "channel"))
class InmemChannelSpec extends ChannelSpec(PersistenceSpec.config("inmem", "channel")) class InmemChannelSpec extends ChannelSpec(PersistenceSpec.config("inmem", "channel"))
class LeveldbPersistentChannelSpec extends PersistentChannelSpec(PersistenceSpec.config("leveldb", "persistent-channel"))
class InmemPersistentChannelSpec extends PersistentChannelSpec(PersistenceSpec.config("inmem", "persistent-channel"))

View file

@ -76,8 +76,17 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(config(customSerializers
"A message serializer" when { "A message serializer" when {
"not given a manifest" must { "not given a manifest" must {
"handle custom persistent message serialization" in { "handle custom ConfirmablePersistent message serialization" in {
val persistent = PersistentImpl(MyPayload("a"), 13, "p1", "c1", true, true, Seq("c1", "c2"), Confirm("p2", 14, "c2"), testActor, testActor) val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true, true, List("c1", "c2"), confirmable = true, Confirm("p2", 14, "c2"), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
val deserialized = serializer.fromBinary(bytes, None)
deserialized must be(persistent.withPayload(MyPayload(".a.")))
}
"handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("a"), 13, "p1", true, true, List("c1", "c2"), confirmable = false, Confirm("p2", 14, "c2"), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent) val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent) val bytes = serializer.toBinary(persistent)
@ -86,19 +95,28 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(config(customSerializers
deserialized must be(persistent.withPayload(MyPayload(".a."))) deserialized must be(persistent.withPayload(MyPayload(".a.")))
} }
} }
"given a persistent message manifest" must { "given a PersistentRepr manifest" must {
"handle custom persistent message serialization" in { "handle custom ConfirmablePersistent message serialization" in {
val persistent = PersistentImpl(MyPayload("b"), 13, "p1", "c1", true, true, Seq("c1", "c2"), Confirm("p2", 14, "c2"), testActor, testActor) val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, true, List("c1", "c2"), confirmable = true, Confirm("p2", 14, "c2"), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent) val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent) val bytes = serializer.toBinary(persistent)
val deserialized = serializer.fromBinary(bytes, Some(classOf[PersistentImpl])) val deserialized = serializer.fromBinary(bytes, Some(classOf[PersistentRepr]))
deserialized must be(persistent.withPayload(MyPayload(".b.")))
}
"handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, true, List("c1", "c2"), confirmable = true, Confirm("p2", 14, "c2"), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
val deserialized = serializer.fromBinary(bytes, Some(classOf[PersistentRepr]))
deserialized must be(persistent.withPayload(MyPayload(".b."))) deserialized must be(persistent.withPayload(MyPayload(".b.")))
} }
} }
"given a confirmation message manifest" must { "given a Confirm manifest" must {
"handle confirmation message serialization" in { "handle Confirm message serialization" in {
val confirmation = Confirm("x", 2, "y") val confirmation = Confirm("x", 2, "y")
val serializer = serialization.findSerializerFor(confirmation) val serializer = serialization.findSerializerFor(confirmation)
@ -120,8 +138,9 @@ object MessageSerializerRemotingSpec {
class RemoteActor extends Actor { class RemoteActor extends Actor {
def receive = { def receive = {
case PersistentBatch(Persistent(MyPayload(data), _) +: tail) sender ! data case PersistentBatch(Persistent(MyPayload(data), _) +: tail) sender ! s"b${data}"
case Persistent(MyPayload(data), _) sender ! data case ConfirmablePersistent(MyPayload(data), _) sender ! s"c${data}"
case Persistent(MyPayload(data), _) sender ! s"p${data}"
case Confirm(pid, snr, cid) sender ! s"${pid},${snr},${cid}" case Confirm(pid, snr, cid) sender ! s"${pid},${snr},${cid}"
} }
} }
@ -146,15 +165,19 @@ class MessageSerializerRemotingSpec extends AkkaSpec(config(systemA).withFallbac
} }
"A message serializer" must { "A message serializer" must {
"custom-serialize persistent messages during remoting" in { "custom-serialize Persistent messages during remoting" in {
localActor ! Persistent(MyPayload("a")) localActor ! Persistent(MyPayload("a"))
expectMsg(".a.") expectMsg("p.a.")
} }
"custom-serialize persistent message batches during remoting" in { "custom-serialize ConfirmablePersistent messages during remoting" in {
localActor ! PersistentRepr(MyPayload("a"), confirmable = true)
expectMsg("c.a.")
}
"custom-serialize Persistent message batches during remoting" in {
localActor ! PersistentBatch(immutable.Seq(Persistent(MyPayload("a")))) localActor ! PersistentBatch(immutable.Seq(Persistent(MyPayload("a"))))
expectMsg(".a.") expectMsg("b.a.")
} }
"serialize confirmation messages during remoting" in { "serialize Confirm messages during remoting" in {
localActor ! Confirm("a", 2, "b") localActor ! Confirm("a", 2, "b")
expectMsg("a,2,b") expectMsg("a,2,b")
} }

View file

@ -30,8 +30,8 @@ public class ProcessorChannelExample {
public static class ExampleDestination extends UntypedActor { public static class ExampleDestination extends UntypedActor {
@Override @Override
public void onReceive(Object message) throws Exception { public void onReceive(Object message) throws Exception {
if (message instanceof Persistent) { if (message instanceof ConfirmablePersistent) {
Persistent msg = (Persistent)message; ConfirmablePersistent msg = (ConfirmablePersistent)message;
msg.confirm(); msg.confirm();
System.out.println("received " + msg.payload()); System.out.println("received " + msg.payload());
} }

View file

@ -16,14 +16,14 @@ object ConversationRecoveryExample extends App {
var counter = 0 var counter = 0
def receive = { def receive = {
case m @ Persistent(Ping, _) { case m @ ConfirmablePersistent(Ping, _) {
counter += 1 counter += 1
println(s"received ping ${counter} times ...") println(s"received ping ${counter} times ...")
m.confirm() m.confirm()
if (!recoveryRunning) Thread.sleep(2000) if (!recoveryRunning) Thread.sleep(1000)
pongChannel ! Deliver(m.withPayload(Pong), sender, Resolve.Destination) pongChannel ! Deliver(m.withPayload(Pong), sender, Resolve.Destination)
} }
case "init" if (counter == 0) self forward Persistent(Ping) case "init" if (counter == 0) pongChannel ! Deliver(Persistent(Pong), sender)
} }
override def preStart() = () override def preStart() = ()
@ -34,11 +34,11 @@ object ConversationRecoveryExample extends App {
var counter = 0 var counter = 0
def receive = { def receive = {
case m @ Persistent(Pong, _) { case m @ ConfirmablePersistent(Pong, _) {
counter += 1 counter += 1
println(s"received pong ${counter} times ...") println(s"received pong ${counter} times ...")
m.confirm() m.confirm()
if (!recoveryRunning) Thread.sleep(2000) if (!recoveryRunning) Thread.sleep(1000)
pingChannel ! Deliver(m.withPayload(Ping), sender, Resolve.Destination) pingChannel ! Deliver(m.withPayload(Ping), sender, Resolve.Destination)
} }
} }

View file

@ -25,7 +25,7 @@ object ProcessorChannelExample extends App {
class ExampleDestination extends Actor { class ExampleDestination extends Actor {
def receive = { def receive = {
case p @ Persistent(payload, snr) { case p @ ConfirmablePersistent(payload, snr) {
println(s"received ${payload}") println(s"received ${payload}")
sender ! s"re: ${payload} (${snr})" sender ! s"re: ${payload} (${snr})"
p.confirm() p.confirm()