+per #15327 Add AtLeastOnceDelivery trait

* also remove final of around methods, and let deliver send when not recoveryRunning

(cherry picked from commit 312b0d107a179accaf135f64ed9c3b78f3e351d1)
This commit is contained in:
Patrik Nordwall 2014-06-03 15:10:56 +02:00
parent 4ad346afd4
commit 32ca608c97
18 changed files with 2969 additions and 15 deletions

View file

@ -287,13 +287,13 @@ delivery is an explicit ACKRETRY protocol. In its simplest form this requires
The third becomes necessary by virtue of the acknowledgements not being guaranteed
to arrive either. An ACK-RETRY protocol with business-level acknowledgements is
supported by :ref:`channels` of the Akka Persistence module. Duplicates can be
detected by tracking the sequence numbers of messages received via channels.
supported by :ref:`at-least-once-delivery` of the Akka Persistence module. Duplicates can be
detected by tracking the identifiers of messages sent via :ref:`at-least-once-delivery`.
Another way of implementing the third part would be to make processing the messages
idempotent on the level of the business logic.
Another example of implementing all three requirements is shown at
:ref:`reliable-proxy` (which is now superseded by :ref:`channels`).
:ref:`reliable-proxy` (which is now superseded by :ref:`at-least-once-delivery`).
Event Sourcing
--------------

View file

@ -4,14 +4,17 @@
package docs.persistence;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.japi.Function;
import akka.japi.Procedure;
import akka.persistence.*;
import scala.Option;
import scala.concurrent.duration.Duration;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
@ -163,6 +166,103 @@ public class PersistenceDocTest {
}
};
static Object atLeastOnceExample = new Object() {
//#at-least-once-example
class Msg implements Serializable {
public final long deliveryId;
public final String s;
public Msg(long deliveryId, String s) {
this.deliveryId = deliveryId;
this.s = s;
}
}
class Confirm implements Serializable {
public final long deliveryId;
public Confirm(long deliveryId) {
this.deliveryId = deliveryId;
}
}
class MsgSent implements Serializable {
public final String s;
public MsgSent(String s) {
this.s = s;
}
}
class MsgConfirmed implements Serializable {
public final long deliveryId;
public MsgConfirmed(long deliveryId) {
this.deliveryId = deliveryId;
}
}
class MyPersistentActor extends UntypedPersistentActorWithAtLeastOnceDelivery {
private final ActorPath destination;
public MyPersistentActor(ActorPath destination) {
this.destination = destination;
}
public void onReceiveCommand(Object message) {
if (message instanceof String) {
String s = (String) message;
persist(new MsgSent(s), new Procedure<MsgSent>() {
public void apply(MsgSent evt) {
updateState(evt);
}
});
} else if (message instanceof Confirm) {
Confirm confirm = (Confirm) message;
persist(new MsgConfirmed(confirm.deliveryId), new Procedure<MsgConfirmed>() {
public void apply(MsgConfirmed evt) {
updateState(evt);
}
});
} else {
unhandled(message);
}
}
public void onReceiveRecover(Object event) {
updateState(event);
}
void updateState(Object event) {
if (event instanceof MsgSent) {
final MsgSent evt = (MsgSent) event;
deliver(destination, new Function<Long, Object>() {
public Object apply(Long deliveryId) {
return new Msg(deliveryId, evt.s);
}
});
} else if (event instanceof MsgConfirmed) {
final MsgConfirmed evt = (MsgConfirmed) event;
confirmDelivery(evt.deliveryId);
}
}
}
class MyDestination extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof Msg) {
Msg msg = (Msg) message;
// ...
getSender().tell(new Confirm(msg.deliveryId), getSelf());
} else {
unhandled(message);
}
}
}
//#at-least-once-example
};
static Object o3 = new Object() {
//#channel-example
class MyProcessor extends UntypedProcessor {

View file

@ -370,6 +370,79 @@ A persistent actor can delete individual snapshots by calling the ``deleteSnapsh
timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should
use the ``deleteSnapshots`` method.
.. _at-least-once-delivery-java-lambda:
At-Least-Once Delivery
======================
To send messages with at-least-once delivery semantics to destinations you can extend the ``AbstractPersistentActorWithAtLeastOnceDelivery``
class instead of ``AbstractPersistentActor`` on the sending side. It takes care of re-sending messages when they
have not been confirmed within a configurable timeout.
.. note::
At-least-once delivery implies that original message send order is not always preserved
and the destination may receive duplicate messages. That means that the
semantics do not match those of a normal :class:`ActorRef` send operation:
* it is not at-most-once delivery
* message order for the same senderreceiver pair is not preserved due to
possible resends
* after a crash and restart of the destination messages are still
delivered—to the new actor incarnation
These semantics is similar to what an :class:`ActorPath` represents (see
:ref:`actor-lifecycle-scala`), therefore you need to supply a path and not a
reference when delivering messages. The messages are sent to the path with
an actor selection.
Use the ``deliver`` method to send a message to a destination. Call the ``confirmDelivery`` method
when the destination has replied with a confirmation message.
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#at-least-once-example
Correlation between ``deliver`` and ``confirmDelivery`` is performed with the ``deliveryId`` that is provided
as parameter to the ``deliveryIdToMessage`` function. The ``deliveryId`` is typically passed in the message to the
destination, which replies with a message containing the same ``deliveryId``.
The ``deliveryId`` is a strictly monotonically increasing sequence number without gaps. The same sequence is
used for all destinations of the actor, i.e. when sending to multiple destinations the destinations will see
gaps in the sequence if no translation is performed.
The ``AbstractPersistentActorWithAtLeastOnceDelivery`` class has a state consisting of unconfirmed messages and a
sequence number. It does not store this state itself. You must persist events corresponding to the
``deliver`` and ``confirmDelivery`` invocations from your ``PersistentActor`` so that the state can
be restored by calling the same methods during the recovery phase of the ``PersistentActor``. Sometimes
these events can be derived from other business level events, and sometimes you must create separate events.
During recovery calls to ``delivery`` will not send out the message, but it will be sent later
if no matching ``confirmDelivery`` was performed.
Support for snapshots is provided by ``getDeliverySnapshot`` and ``setDeliverySnapshot``.
The ``AtLeastOnceDeliverySnapshot`` contains the full delivery state, including unconfirmed messages.
If you need a custom snapshot for other parts of the actor state you must also include the
``AtLeastOnceDeliverySnapshot``. It is serialized using protobuf with the ordinary Akka
serialization mechanism. It is easiest to include the bytes of the ``AtLeastOnceDeliverySnapshot``
as a blob in your custom snapshot.
The interval between redelivery attempts is defined by the ``redeliverInterval`` method.
The default value can be configured with the ``akka.persistence.at-least-once-delivery.redeliver-interval``
configuration key. The method can be overridden by implementation classes to return non-default values.
After a number of delivery attempts a ``AtLeastOnceDelivery.UnconfirmedWarning`` message
will be sent to ``self``. The re-sending will still continue, but you can choose to call
``confirmDelivery`` to cancel the re-sending. The number of delivery attempts before emitting the
warning is defined by the ``warnAfterNumberOfUnconfirmedAttempts`` method. The default value can be
configured with the ``akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts``
configuration key. The method can be overridden by implementation classes to return non-default values.
The ``AbstractPersistentActorWithAtLeastOnceDelivery`` class holds messages in memory until their successful delivery has been confirmed.
The limit of maximum number of unconfirmed messages that the actor is allowed to hold in memory
is defined by the ``maxUnconfirmedMessages`` method. If this limit is exceed the ``deliver`` method will
not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException``.
The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages``
configuration key. The method can be overridden by implementation classes to return non-default values.
Storage plugins
===============

View file

@ -376,6 +376,80 @@ A persistent actor can delete individual snapshots by calling the ``deleteSnapsh
timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should
use the ``deleteSnapshots`` method.
.. _at-least-once-delivery-java:
At-Least-Once Delivery
======================
To send messages with at-least-once delivery semantics to destinations you can extend the ``UntypedPersistentActorWithAtLeastOnceDelivery``
class instead of ``UntypedPersistentActor`` on the sending side. It takes care of re-sending messages when they
have not been confirmed within a configurable timeout.
.. note::
At-least-once delivery implies that original message send order is not always preserved
and the destination may receive duplicate messages. That means that the
semantics do not match those of a normal :class:`ActorRef` send operation:
* it is not at-most-once delivery
* message order for the same senderreceiver pair is not preserved due to
possible resends
* after a crash and restart of the destination messages are still
delivered—to the new actor incarnation
These semantics is similar to what an :class:`ActorPath` represents (see
:ref:`actor-lifecycle-scala`), therefore you need to supply a path and not a
reference when delivering messages. The messages are sent to the path with
an actor selection.
Use the ``deliver`` method to send a message to a destination. Call the ``confirmDelivery`` method
when the destination has replied with a confirmation message.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#at-least-once-example
Correlation between ``deliver`` and ``confirmDelivery`` is performed with the ``deliveryId`` that is provided
as parameter to the ``deliveryIdToMessage`` function. The ``deliveryId`` is typically passed in the message to the
destination, which replies with a message containing the same ``deliveryId``.
The ``deliveryId`` is a strictly monotonically increasing sequence number without gaps. The same sequence is
used for all destinations of the actor, i.e. when sending to multiple destinations the destinations will see
gaps in the sequence if no translation is performed.
The ``UntypedPersistentActorWithAtLeastOnceDelivery`` class has a state consisting of unconfirmed messages and a
sequence number. It does not store this state itself. You must persist events corresponding to the
``deliver`` and ``confirmDelivery`` invocations from your ``PersistentActor`` so that the state can
be restored by calling the same methods during the recovery phase of the ``PersistentActor``. Sometimes
these events can be derived from other business level events, and sometimes you must create separate events.
During recovery calls to ``delivery`` will not send out the message, but it will be sent later
if no matching ``confirmDelivery`` was performed.
Support for snapshots is provided by ``getDeliverySnapshot`` and ``setDeliverySnapshot``.
The ``AtLeastOnceDeliverySnapshot`` contains the full delivery state, including unconfirmed messages.
If you need a custom snapshot for other parts of the actor state you must also include the
``AtLeastOnceDeliverySnapshot``. It is serialized using protobuf with the ordinary Akka
serialization mechanism. It is easiest to include the bytes of the ``AtLeastOnceDeliverySnapshot``
as a blob in your custom snapshot.
The interval between redelivery attempts is defined by the ``redeliverInterval`` method.
The default value can be configured with the ``akka.persistence.at-least-once-delivery.redeliver-interval``
configuration key. The method can be overridden by implementation classes to return non-default values.
After a number of delivery attempts a ``AtLeastOnceDelivery.UnconfirmedWarning`` message
will be sent to ``self``. The re-sending will still continue, but you can choose to call
``confirmDelivery`` to cancel the re-sending. The number of delivery attempts before emitting the
warning is defined by the ``warnAfterNumberOfUnconfirmedAttempts`` method. The default value can be
configured with the ``akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts``
configuration key. The method can be overridden by implementation classes to return non-default values.
The ``UntypedPersistentActorWithAtLeastOnceDelivery`` class holds messages in memory until their successful delivery has been confirmed.
The limit of maximum number of unconfirmed messages that the actor is allowed to hold in memory
is defined by the ``maxUnconfirmedMessages`` method. If this limit is exceed the ``deliver`` method will
not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException``.
The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages``
configuration key. The method can be overridden by implementation classes to return non-default values.
Storage plugins
===============

View file

@ -9,7 +9,6 @@ import akka.persistence._
import scala.concurrent.duration._
import scala.language.postfixOps
trait PersistenceDocSpec {
val config =
"""
@ -133,6 +132,48 @@ trait PersistenceDocSpec {
}
}
new AnyRef {
//#at-least-once-example
import akka.actor.{ Actor, ActorPath, Props }
import akka.persistence.AtLeastOnceDelivery
case class Msg(deliveryId: Long, s: String)
case class Confirm(deliveryId: Long)
sealed trait Evt
case class MsgSent(s: String) extends Evt
case class MsgConfirmed(deliveryId: Long) extends Evt
class MyPersistentActor(destination: ActorPath)
extends PersistentActor with AtLeastOnceDelivery {
def receiveCommand: Receive = {
case s: String => persist(MsgSent(s))(updateState)
case Confirm(deliveryId) => persist(MsgConfirmed(deliveryId))(updateState)
}
def receiveRecover: Receive = {
case evt: Evt => updateState(evt)
}
def updateState(evt: Evt): Unit = evt match {
case MsgSent(s) =>
deliver(destination, deliveryId => Msg(deliveryId, s))
case MsgConfirmed(deliveryId) => confirmDelivery(deliveryId)
}
}
class MyDestination extends Actor {
def receive = {
case Msg(deliveryId, s) =>
// ...
sender() ! Confirm(deliveryId)
}
}
//#at-least-once-example
}
new AnyRef {
//#channel-example
import akka.actor.{ Actor, Props }

View file

@ -409,6 +409,79 @@ A persistent actor can delete individual snapshots by calling the ``deleteSnapsh
timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should
use the ``deleteSnapshots`` method.
.. _at-least-once-delivery:
At-Least-Once Delivery
======================
To send messages with at-least-once delivery semantics to destinations you can add the ``AtLeastOnceDelivery``
trait to your ``PersistentActor`` on the sending side. It takes care of re-sending messages when they
have not been confirmed within a configurable timeout.
.. note::
At-least-once delivery implies that original message send order is not always preserved
and the destination may receive duplicate messages. That means that the
semantics do not match those of a normal :class:`ActorRef` send operation:
* it is not at-most-once delivery
* message order for the same senderreceiver pair is not preserved due to
possible resends
* after a crash and restart of the destination messages are still
delivered—to the new actor incarnation
These semantics is similar to what an :class:`ActorPath` represents (see
:ref:`actor-lifecycle-scala`), therefore you need to supply a path and not a
reference when delivering messages. The messages are sent to the path with
an actor selection.
Use the ``deliver`` method to send a message to a destination. Call the ``confirmDelivery`` method
when the destination has replied with a confirmation message.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#at-least-once-example
Correlation between ``deliver`` and ``confirmDelivery`` is performed with the ``deliveryId`` that is provided
as parameter to the ``deliveryIdToMessage`` function. The ``deliveryId`` is typically passed in the message to the
destination, which replies with a message containing the same ``deliveryId``.
The ``deliveryId`` is a strictly monotonically increasing sequence number without gaps. The same sequence is
used for all destinations of the actor, i.e. when sending to multiple destinations the destinations will see
gaps in the sequence if no translation is performed.
The ``AtLeastOnceDelivery`` trait has a state consisting of unconfirmed messages and a
sequence number. It does not store this state itself. You must persist events corresponding to the
``deliver`` and ``confirmDelivery`` invocations from your ``PersistentActor`` so that the state can
be restored by calling the same methods during the recovery phase of the ``PersistentActor``. Sometimes
these events can be derived from other business level events, and sometimes you must create separate events.
During recovery calls to ``delivery`` will not send out the message, but it will be sent later
if no matching ``confirmDelivery`` was performed.
Support for snapshots is provided by ``getDeliverySnapshot`` and ``setDeliverySnapshot``.
The ``AtLeastOnceDeliverySnapshot`` contains the full delivery state, including unconfirmed messages.
If you need a custom snapshot for other parts of the actor state you must also include the
``AtLeastOnceDeliverySnapshot``. It is serialized using protobuf with the ordinary Akka
serialization mechanism. It is easiest to include the bytes of the ``AtLeastOnceDeliverySnapshot``
as a blob in your custom snapshot.
The interval between redelivery attempts is defined by the ``redeliverInterval`` method.
The default value can be configured with the ``akka.persistence.at-least-once-delivery.redeliver-interval``
configuration key. The method can be overridden by implementation classes to return non-default values.
After a number of delivery attempts a ``AtLeastOnceDelivery.UnconfirmedWarning`` message
will be sent to ``self``. The re-sending will still continue, but you can choose to call
``confirmDelivery`` to cancel the re-sending. The number of delivery attempts before emitting the
warning is defined by the ``warnAfterNumberOfUnconfirmedAttempts`` method. The default value can be
configured with the ``akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts``
configuration key. The method can be overridden by implementation classes to return non-default values.
The ``AtLeastOnceDelivery`` trait holds messages in memory until their successful delivery has been confirmed.
The limit of maximum number of unconfirmed messages that the actor is allowed to hold in memory
is defined by the ``maxUnconfirmedMessages`` method. If this limit is exceed the ``deliver`` method will
not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException``.
The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages``
configuration key. The method can be overridden by implementation classes to return non-default values.
.. _storage-plugins:

View file

@ -40,3 +40,14 @@ message DeliverMessage {
optional PersistentMessage persistent = 1;
optional string destination = 2;
}
message AtLeastOnceDeliverySnapshot {
message UnconfirmedDelivery {
required int64 deliveryId = 1;
required string destination = 2;
required PersistentPayload payload = 3;
}
required int64 currentDeliveryId = 1;
repeated UnconfirmedDelivery unconfirmedDeliveries = 2;
}

View file

@ -146,6 +146,19 @@ akka {
auto-update-replay-max = -1
}
at-least-once-delivery {
# Interval between redelivery attempts
redeliver-interval = 5s
# After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning`
# message will be sent to the actor.
warn-after-number-of-unconfirmed-attempts = 5
# Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is
# allowed to hold in memory.
max-unconfirmed-messages = 100000
}
dispatchers {
default-plugin-dispatcher {
type = PinnedDispatcher

View file

@ -0,0 +1,355 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.annotation.tailrec
import scala.collection.breakOut
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor
import akka.actor.ActorPath
import akka.persistence.serialization.Message
object AtLeastOnceDelivery {
/**
* Snapshot of current `AtLeastOnceDelivery` state. Can be retrieved with
* [[AtLeastOnceDelivery#getDeliverySnapshot]] and saved with [[PersistentActor#saveSnapshot]].
* During recovery the snapshot received in [[SnapshotOffer]] should be set
* with [[AtLeastOnceDelivery.setDeliverySnapshot]].
*/
@SerialVersionUID(1L)
case class AtLeastOnceDeliverySnapshot(currentDeliveryId: Long, unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery])
extends Message {
/**
* Java API
*/
def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = {
import scala.collection.JavaConverters._
unconfirmedDeliveries.asJava
}
}
/**
* @see [[AtLeastOnceDelivery#warnAfterNumberOfUnconfirmedAttempts]]
*/
@SerialVersionUID(1L)
case class UnconfirmedWarning(unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]) {
/**
* Java API
*/
def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = {
import scala.collection.JavaConverters._
unconfirmedDeliveries.asJava
}
}
/**
* Information about a message that has not been confirmed. Included in [[UnconfirmedWarning]]
* and [[AtLeastOnceDeliverySnapshot]].
*/
case class UnconfirmedDelivery(deliveryId: Long, destination: ActorPath, message: Any) {
/**
* Java API
*/
def getMessage(): AnyRef = message.asInstanceOf[AnyRef]
}
/**
* @see [[AtLeastOnceDelivery#maxUnconfirmedMessages]]
*/
class MaxUnconfirmedMessagesExceededException(message: String) extends RuntimeException(message)
private object Internal {
case class Delivery(destination: ActorPath, message: Any, timestamp: Long, attempt: Int)
case object RedeliveryTick
}
}
/**
* Use this trait with your `PersistentActor` to send messages with at-least-once
* delivery semantics to destinations. It takes care of re-sending messages when they
* have not been confirmed within a configurable timeout. Use the [[#deliver]] method to
* send a message to a destination. Call the [[#confirmDelivery]] method when the destination
* has replied with a confirmation message.
*
* At-least-once delivery implies that original message send order is not always retained
* and the destination may receive duplicate messages due to possible resends.
*
* The interval between redelivery attempts can be defined by [[#redeliverInterval]].
* After a number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message
* will be sent to `self`. The re-sending will still continue, but you can choose to call
* [[#confirmDelivery]] to cancel the re-sending.
*
* The `AtLeastOnceDelivery` trait has a state consisting of unconfirmed messages and a
* sequence number. It does not store this state itself. You must persist events corresponding
* to the `deliver` and `confirmDelivery` invocations from your `PersistentActor` so that the
* state can be restored by calling the same methods during the recovery phase of the
* `PersistentActor`. Sometimes these events can be derived from other business level events,
* and sometimes you must create separate events. During recovery calls to `delivery`
* will not send out the message, but it will be sent later if no matching `confirmDelivery`
* was performed.
*
* Support for snapshots is provided by [[#getDeliverySnapshot]] and [[#setDeliverySnapshot]].
* The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages.
* If you need a custom snapshot for other parts of the actor state you must also include the
* `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka
* serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot`
* as a blob in your custom snapshot.
*/
trait AtLeastOnceDelivery extends Processor {
// FIXME The reason for extending Processor instead of PersistentActor is
// the class hierarchy for UntypedPersistentActorWithAtLeastOnceDelivery
import AtLeastOnceDelivery._
import AtLeastOnceDelivery.Internal._
/**
* Interval between redelivery attempts.
*
* The default value can be configured with the
* `akka.persistence.at-least-once-delivery.redeliver-interval`
* configuration key. This method can be overridden by implementation classes to return
* non-default values.
*/
def redeliverInterval: FiniteDuration = defaultRedeliverInterval
private val defaultRedeliverInterval: FiniteDuration =
Persistence(context.system).settings.atLeastOnceDelivery.redeliverInterval
/**
* After this number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message
* will be sent to `self`. The count is reset after a restart.
*
* The default value can be configured with the
* `akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts`
* configuration key. This method can be overridden by implementation classes to return
* non-default values.
*/
def warnAfterNumberOfUnconfirmedAttempts: Int = defaultWarnAfterNumberOfUnconfirmedAttempts
private val defaultWarnAfterNumberOfUnconfirmedAttempts: Int =
Persistence(context.system).settings.atLeastOnceDelivery.warnAfterNumberOfUnconfirmedAttempts
/**
* Maximum number of unconfirmed messages that this actor is allowed to hold in memory.
* If this number is exceed [[#deliver]] will not accept more messages and it will throw
* [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]].
*
* The default value can be configured with the
* `akka.persistence.at-least-once-delivery.max-unconfirmed-messages`
* configuration key. This method can be overridden by implementation classes to return
* non-default values.
*/
def maxUnconfirmedMessages: Int = defaultMaxUnconfirmedMessages
private val defaultMaxUnconfirmedMessages: Int =
Persistence(context.system).settings.atLeastOnceDelivery.maxUnconfirmedMessages
private val redeliverTask = {
import context.dispatcher
val interval = redeliverInterval / 2
context.system.scheduler.schedule(interval, interval, self, RedeliveryTick)
}
private var deliverySequenceNr = 0L
private var unconfirmed = immutable.SortedMap.empty[Long, Delivery]
private def nextDeliverySequenceNr(): Long = {
deliverySequenceNr += 1
deliverySequenceNr
}
/**
* Scala API: Send the message created by the `deliveryIdToMessage` function to
* the `destination` actor. It will retry sending the message until
* the delivery is confirmed with [[#confirmDelivery]]. Correlation
* between `deliver` and `confirmDelivery` is performed with the
* `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
* function. The `deliveryId` is typically passed in the message to the
* destination, which replies with a message containing the same `deliveryId`.
*
* The `deliveryId` is a strictly monotonically increasing sequence number without
* gaps. The same sequence is used for all destinations of the actor, i.e. when sending
* to multiple destinations the destinations will see gaps in the sequence if no
* translation is performed.
*
* During recovery this method will not send out the message, but it will be sent
* later if no matching `confirmDelivery` was performed.
*
* This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
* if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
*/
def deliver(destination: ActorPath, deliveryIdToMessage: Long Any): Unit = {
if (unconfirmed.size >= maxUnconfirmedMessages)
throw new MaxUnconfirmedMessagesExceededException(
s"Too many unconfirmed messages, maximum allowed is [$maxUnconfirmedMessages]")
val deliveryId = nextDeliverySequenceNr()
val now = System.nanoTime()
val d = Delivery(destination, deliveryIdToMessage(deliveryId), now, attempt = 0)
if (recoveryRunning)
unconfirmed = unconfirmed.updated(deliveryId, d)
else
send(deliveryId, d, now)
}
/**
* Call this method when a message has been confirmed by the destination,
* or to abort re-sending.
* @see [[#deliver]]
* @return `true` the first time the `deliveryId` is confirmed, i.e. `false` for duplicate confirm
*/
def confirmDelivery(deliveryId: Long): Boolean = {
if (unconfirmed.contains(deliveryId)) {
unconfirmed -= deliveryId
true
} else false
}
/**
* Number of messages that have not been confirmed yet.
*/
def numberOfUnconfirmed: Int = unconfirmed.size
private def redeliverOverdue(): Unit = {
val now = System.nanoTime()
val deadline = now - redeliverInterval.toNanos
var warnings = Vector.empty[UnconfirmedDelivery]
unconfirmed foreach {
case (deliveryId, delivery)
if (delivery.timestamp <= deadline) {
send(deliveryId, delivery, now)
if (delivery.attempt == warnAfterNumberOfUnconfirmedAttempts)
warnings :+= UnconfirmedDelivery(deliveryId, delivery.destination, delivery.message)
}
}
if (warnings.nonEmpty)
self ! UnconfirmedWarning(warnings)
}
private def send(deliveryId: Long, d: Delivery, timestamp: Long): Unit = {
context.actorSelection(d.destination) ! d.message
unconfirmed = unconfirmed.updated(deliveryId, d.copy(timestamp = timestamp, attempt = d.attempt + 1))
}
/**
* Full state of the `AtLeastOnceDelivery`. It can be saved with [[PersistentActor#saveSnapshot]].
* During recovery the snapshot received in [[SnapshotOffer]] should be set
* with [[#setDeliverySnapshot]].
*
* The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages.
* If you need a custom snapshot for other parts of the actor state you must also include the
* `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka
* serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot`
* as a blob in your custom snapshot.
*/
def getDeliverySnapshot: AtLeastOnceDeliverySnapshot =
AtLeastOnceDeliverySnapshot(deliverySequenceNr,
unconfirmed.map { case (deliveryId, d) UnconfirmedDelivery(deliveryId, d.destination, d.message) }(breakOut))
/**
* If snapshot from [[#getDeliverySnapshot]] was saved it will be received during recovery
* in a [[SnapshotOffer]] message and should be set with this method.
*/
def setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit = {
deliverySequenceNr = snapshot.currentDeliveryId
val now = System.nanoTime()
unconfirmed = snapshot.unconfirmedDeliveries.map(d
d.deliveryId -> Delivery(d.destination, d.message, now, 0))(breakOut)
}
/**
* INTERNAL API
*/
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
redeliverTask.cancel()
super.aroundPreRestart(reason, message)
}
/**
* INTERNAL API
*/
override protected[akka] def aroundPostStop(): Unit = {
redeliverTask.cancel()
super.aroundPostStop()
}
/**
* INTERNAL API
*/
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
message match {
case RedeliveryTick redeliverOverdue()
case _ super.aroundReceive(receive, message)
}
}
/**
* Java API: Use this class instead of `UntypedPersistentActor` to send messages
* with at-least-once delivery semantics to destinations.
* Full documentation in [[AtLeastOnceDelivery]].
*
* @see [[AtLeastOnceDelivery]]
*/
abstract class UntypedPersistentActorWithAtLeastOnceDelivery extends UntypedPersistentActor with AtLeastOnceDelivery {
/**
* Java API: Send the message created by the `deliveryIdToMessage` function to
* the `destination` actor. It will retry sending the message until
* the delivery is confirmed with [[#confirmDelivery]]. Correlation
* between `deliver` and `confirmDelivery` is performed with the
* `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
* function. The `deliveryId` is typically passed in the message to the
* destination, which replies with a message containing the same `deliveryId`.
*
* The `deliveryId` is a strictly monotonically increasing sequence number without
* gaps. The same sequence is used for all destinations, i.e. when sending to
* multiple destinations the destinations will see gaps in the sequence if no
* translation is performed.
*
* During recovery this method will not send out the message, but it will be sent
* later if no matching `confirmDelivery` was performed.
*
* This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
* if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
*/
def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit =
super.deliver(destination, id deliveryIdToMessage.apply(id))
}
/**
* Java API compatible with lambda expressions
*
* Use this class instead of `UntypedPersistentActor` to send messages
* with at-least-once delivery semantics to destinations.
* Full documentation in [[AtLeastOnceDelivery]].
*
* @see [[AtLeastOnceDelivery]]
*/
abstract class AbstractPersistentActorWithAtLeastOnceDelivery extends AbstractPersistentActor with AtLeastOnceDelivery {
/**
* Java API: Send the message created by the `deliveryIdToMessage` function to
* the `destination` actor. It will retry sending the message until
* the delivery is confirmed with [[#confirmDelivery]]. Correlation
* between `deliver` and `confirmDelivery` is performed with the
* `deliveryId` that is provided as parameter to the `deliveryIdToMessage`
* function. The `deliveryId` is typically passed in the message to the
* destination, which replies with a message containing the same `deliveryId`.
*
* The `deliveryId` is a strictly monotonically increasing sequence number without
* gaps. The same sequence is used for all destinations, i.e. when sending to
* multiple destinations the destinations will see gaps in the sequence if no
* translation is performed.
*
* During recovery this method will not send out the message, but it will be sent
* later if no matching `confirmDelivery` was performed.
*
* This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]
* if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]].
*/
def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit =
super.deliver(destination, id deliveryIdToMessage.apply(id))
}

View file

@ -345,7 +345,7 @@ private[persistence] trait Eventsourced extends ProcessorImpl {
/**
* INTERNAL API.
*/
final override protected[akka] def aroundReceive(receive: Receive, message: Any) {
override protected[akka] def aroundReceive(receive: Receive, message: Any) {
currentState.aroundReceive(receive, message)
}

View file

@ -42,6 +42,18 @@ final class PersistenceSettings(config: Config) {
if (v < 0) Long.MaxValue else v
}
object atLeastOnceDelivery {
val redeliverInterval: FiniteDuration =
config.getMillisDuration("at-least-once-delivery.redeliver-interval")
val warnAfterNumberOfUnconfirmedAttempts: Int =
config.getInt("at-least-once-delivery.warn-after-number-of-unconfirmed-attempts")
val maxUnconfirmedMessages: Int =
config.getInt("at-least-once-delivery.max-unconfirmed-messages")
}
/**
* INTERNAL API.
*

View file

@ -245,21 +245,21 @@ private[akka] trait ProcessorImpl extends Actor with Recovery {
/**
* INTERNAL API.
*/
final override protected[akka] def aroundPreStart(): Unit = {
override protected[akka] def aroundPreStart(): Unit = {
try preStart() finally super.preStart()
}
/**
* INTERNAL API.
*/
final override protected[akka] def aroundPostStop(): Unit = {
override protected[akka] def aroundPostStop(): Unit = {
try unstashAll(unstashFilterPredicate) finally postStop()
}
/**
* INTERNAL API.
*/
final override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
try {
receiverStash.prepend(processorBatch.map(p Envelope(p, p.sender, context.system)))
receiverStash.unstashAll()

View file

@ -5,14 +5,15 @@
package akka.persistence.serialization
import scala.language.existentials
import com.google.protobuf._
import akka.actor.{ ActorPath, ExtendedActorSystem }
import akka.japi.Util.immutableSeq
import akka.persistence._
import akka.persistence.serialization.MessageFormats._
import akka.serialization._
import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot AtLeastOnceDeliverySnap }
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
import scala.collection.immutable.VectorBuilder
/**
* Marker trait for all protobuf-serializable messages in `akka.persistence`.
@ -32,6 +33,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
val DeliveredByTransientChannelClass = classOf[DeliveredByChannel]
val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistentChannel]
val DeliverClass = classOf[Deliver]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap]
def identifier: Int = 7
def includeManifest: Boolean = true
@ -52,6 +54,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
case c: DeliveredByChannel deliveredMessageBuilder(c).build().toByteArray
case c: DeliveredByPersistentChannel deliveredMessageBuilder(c).build().toByteArray
case d: Deliver deliverMessageBuilder(d).build.toByteArray
case a: AtLeastOnceDeliverySnap atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
@ -69,6 +72,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
case DeliveredByTransientChannelClass delivered(DeliveredMessage.parseFrom(bytes))
case DeliveredByPersistentChannelClass delivered(DeliveredMessage.parseFrom(bytes))
case DeliverClass deliver(DeliverMessage.parseFrom(bytes))
case AtLeastOnceDeliverySnapshotClass atLeastOnceDeliverySnapshot(AtLeastOnceDeliverySnapshot.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
}
}
@ -84,6 +88,33 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
builder
}
def atLeastOnceDeliverySnapshotBuilder(snap: AtLeastOnceDeliverySnap): AtLeastOnceDeliverySnapshot.Builder = {
val builder = AtLeastOnceDeliverySnapshot.newBuilder
builder.setCurrentDeliveryId(snap.currentDeliveryId)
snap.unconfirmedDeliveries.foreach { unconfirmed
val unconfirmedBuilder =
AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.newBuilder.
setDeliveryId(unconfirmed.deliveryId).
setDestination(unconfirmed.destination.toString).
setPayload(persistentPayloadBuilder(unconfirmed.message.asInstanceOf[AnyRef]))
builder.addUnconfirmedDeliveries(unconfirmedBuilder)
}
builder
}
def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnap = {
import scala.collection.JavaConverters._
val unconfirmedDeliveries = new VectorBuilder[UnconfirmedDelivery]()
atLeastOnceDeliverySnapshot.getUnconfirmedDeliveriesList().iterator().asScala foreach { next
unconfirmedDeliveries += UnconfirmedDelivery(next.getDeliveryId, ActorPath.fromString(next.getDestination),
payload(next.getPayload))
}
AtLeastOnceDeliverySnap(
atLeastOnceDeliverySnapshot.getCurrentDeliveryId,
unconfirmedDeliveries.result())
}
private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = {
val builder = PersistentMessageBatch.newBuilder
persistentBatch.batch.

View file

@ -0,0 +1,177 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.language.postfixOps
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.testkit._
object AtLeastOnceDeliveryFailureSpec {
val config = ConfigFactory.parseString(
s"""
akka.persistence.sender.chaos.live-processing-failure-rate = 0.3
akka.persistence.sender.chaos.replay-processing-failure-rate = 0.1
akka.persistence.destination.chaos.confirm-failure-rate = 0.3
akka.persistence.journal.plugin = "akka.persistence.journal.chaos"
akka.persistence.journal.chaos.write-failure-rate = 0.3
akka.persistence.journal.chaos.confirm-failure-rate = 0.2
akka.persistence.journal.chaos.delete-failure-rate = 0.3
akka.persistence.journal.chaos.replay-failure-rate = 0.25
akka.persistence.journal.chaos.read-highest-failure-rate = 0.1
akka.persistence.journal.chaos.class = akka.persistence.journal.chaos.ChaosJournal
akka.persistence.snapshot-store.local.dir = "target/snapshots-at-least-once-delivery-failure-spec/"
""")
val numMessages = 10
case object Start
case class Done(ints: Vector[Int])
case class ProcessingFailure(i: Int)
case class JournalingFailure(i: Int)
case class Msg(deliveryId: Long, i: Int)
case class Confirm(deliveryId: Long, i: Int)
sealed trait Evt
case class MsgSent(i: Int) extends Evt
case class MsgConfirmed(deliveryId: Long, i: Int) extends Evt
trait ChaosSupport { this: Actor
def random = ThreadLocalRandom.current
def probe: ActorRef
var state = Vector.empty[Int]
def contains(i: Int): Boolean =
state.contains(i)
def add(i: Int): Unit = {
state :+= i
if (state.length == numMessages) probe ! Done(state)
}
def shouldFail(rate: Double) =
random.nextDouble() < rate
}
class ChaosSender(destination: ActorRef, val probe: ActorRef) extends PersistentActor with ChaosSupport with ActorLogging with AtLeastOnceDelivery {
val config = context.system.settings.config.getConfig("akka.persistence.sender.chaos")
val liveProcessingFailureRate = config.getDouble("live-processing-failure-rate")
val replayProcessingFailureRate = config.getDouble("replay-processing-failure-rate")
override def redeliverInterval = 500.milliseconds
override def processorId = "chaosSender"
def receiveCommand: Receive = {
case i: Int
val failureRate = if (recoveryRunning) replayProcessingFailureRate else liveProcessingFailureRate
if (contains(i)) {
log.debug(debugMessage(s"ignored duplicate ${i}"))
} else {
persist(MsgSent(i)) { evt
updateState(evt)
if (shouldFail(failureRate))
throw new TestException(debugMessage(s"failed at payload ${i}"))
else
log.debug(debugMessage(s"processed payload ${i}"))
}
}
case Confirm(deliveryId, i) persist(MsgConfirmed(deliveryId, i))(updateState)
case PersistenceFailure(MsgSent(i), _, _)
// inform sender about journaling failure so that it can resend
sender() ! JournalingFailure(i)
case PersistenceFailure(MsgConfirmed(_, i), _, _)
// ok, will be redelivered
}
def receiveRecover: Receive = {
case evt: Evt updateState(evt)
case RecoveryFailure(_)
// journal failed during recovery, throw exception to re-recover processor
throw new TestException(debugMessage("recovery failed"))
}
def updateState(evt: Evt): Unit = evt match {
case MsgSent(i)
add(i)
deliver(destination.path, deliveryId Msg(deliveryId, i))
case MsgConfirmed(deliveryId, i)
confirmDelivery(deliveryId)
}
private def debugMessage(msg: String): String =
s"[sender] ${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${state.sorted})"
}
class ChaosDestination(val probe: ActorRef) extends Actor with ChaosSupport with ActorLogging {
val config = context.system.settings.config.getConfig("akka.persistence.destination.chaos")
val confirmFailureRate = config.getDouble("confirm-failure-rate")
def receive = {
case m @ Msg(deliveryId, i)
if (shouldFail(confirmFailureRate)) {
log.error(debugMessage("confirm message failed", m))
} else if (contains(i)) {
log.debug(debugMessage("ignored duplicate", m))
sender() ! Confirm(deliveryId, i)
} else {
add(i)
sender() ! Confirm(deliveryId, i)
log.debug(debugMessage("received and confirmed message", m))
}
}
private def debugMessage(msg: String, m: Msg): String =
s"[destination] ${msg} (message = $m)"
}
class ChaosApp(probe: ActorRef) extends Actor with ActorLogging {
val destination = context.actorOf(Props(classOf[ChaosDestination], probe), "destination")
val snd = context.actorOf(Props(classOf[ChaosSender], destination, probe), "sender")
def receive = {
case Start 1 to numMessages foreach (snd ! _)
case ProcessingFailure(i)
snd ! i
log.debug(s"resent ${i} after processing failure")
case JournalingFailure(i)
snd ! i
log.debug(s"resent ${i} after journaling failure")
}
}
}
class AtLeastOnceDeliveryFailureSpec extends AkkaSpec(AtLeastOnceDeliveryFailureSpec.config) with Cleanup with ImplicitSender {
import AtLeastOnceDeliveryFailureSpec._
"AtLeastOnceDelivery" must {
"tolerate and recover from random failures" in {
system.actorOf(Props(classOf[ChaosApp], testActor), "chaosApp") ! Start
expectDone() // by sender
expectDone() // by destination
system.actorOf(Props(classOf[ChaosApp], testActor), "chaosApp2") // recovery of new instance should have same outcome
expectDone() // by sender
// destination doesn't receive messages again because all have been confirmed already
}
}
def expectDone() = within(numMessages.seconds) {
expectMsgType[Done].ints.sorted should be(1 to numMessages toVector)
}
}

View file

@ -0,0 +1,293 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import com.typesafe.config._
import akka.actor._
import akka.testkit._
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning
import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning
object AtLeastOnceDeliverySpec {
case class Req(payload: String)
case object ReqAck
case object InvalidReq
sealed trait Evt
case class AcceptedReq(payload: String, destination: ActorPath) extends Evt
case class ReqDone(id: Long) extends Evt
case class Action(id: Long, payload: String)
case class ActionAck(id: Long)
case object Boom
case object SaveSnap
case class Snap(deliverySnapshot: AtLeastOnceDeliverySnapshot) // typically includes some user data as well
def senderProps(testActor: ActorRef, name: String,
redeliverInterval: FiniteDuration, warnAfterNumberOfUnconfirmedAttempts: Int,
async: Boolean, destinations: Map[String, ActorPath]): Props =
Props(new Sender(testActor, name, redeliverInterval, warnAfterNumberOfUnconfirmedAttempts, async, destinations))
class Sender(testActor: ActorRef,
name: String,
override val redeliverInterval: FiniteDuration,
override val warnAfterNumberOfUnconfirmedAttempts: Int,
async: Boolean,
destinations: Map[String, ActorPath])
extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
override def processorId: String = name
def updateState(evt: Evt): Unit = evt match {
case AcceptedReq(payload, destination)
deliver(destination, deliveryId Action(deliveryId, payload))
case ReqDone(id)
confirmDelivery(id)
}
val receiveCommand: Receive = {
case Req(payload)
if (payload.isEmpty)
sender() ! InvalidReq
else {
val destination = destinations(payload.take(1).toUpperCase)
if (async)
persistAsync(AcceptedReq(payload, destination)) { evt
updateState(evt)
sender() ! ReqAck
}
else
persist(AcceptedReq(payload, destination)) { evt
updateState(evt)
sender() ! ReqAck
}
}
case ActionAck(id)
log.debug("Sender got ack {}", id)
if (confirmDelivery(id))
if (async)
persistAsync(ReqDone(id)) { evt updateState(evt) }
else
persist(ReqDone(id)) { evt updateState(evt) }
case Boom
throw new RuntimeException("boom") with NoStackTrace
case SaveSnap
saveSnapshot(Snap(getDeliverySnapshot))
case w: UnconfirmedWarning
testActor ! w
}
def receiveRecover: Receive = {
case evt: Evt updateState(evt)
case SnapshotOffer(_, Snap(deliverySnapshot))
setDeliverySnapshot(deliverySnapshot)
}
}
def destinationProps(testActor: ActorRef): Props =
Props(new Destination(testActor))
class Destination(testActor: ActorRef) extends Actor with ActorLogging {
var allReceived = Set.empty[Long]
def receive = {
case a @ Action(id, payload)
// discard duplicates (naive impl)
if (!allReceived.contains(id)) {
log.debug("Destination got {}, all count {}", a, allReceived.size + 1)
testActor ! a
allReceived += id
}
sender() ! ActionAck(id)
}
}
def unreliableProps(dropMod: Int, target: ActorRef): Props =
Props(new Unreliable(dropMod, target))
class Unreliable(dropMod: Int, target: ActorRef) extends Actor with ActorLogging {
var count = 0
def receive = {
case msg
count += 1
if (count % dropMod != 0) {
log.debug("Pass msg {} count {}", msg, count)
target forward msg
} else {
log.debug("Drop msg {} count {}", msg, count)
}
}
}
}
abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import AtLeastOnceDeliverySpec._
"AtLeastOnceDelivery" must {
"deliver messages in order when nothing is lost" in {
val probeA = TestProbe()
val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
snd ! Req("a")
expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a"))
probeA.expectNoMsg(1.second)
}
"re-deliver lost messages" in {
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd ! Req("a-2")
expectMsg(ReqAck)
probeA.expectMsg(Action(2, "a-2"))
snd ! Req("a-3")
snd ! Req("a-4")
expectMsg(ReqAck)
expectMsg(ReqAck)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// and then re-delivered
probeA.expectMsg(Action(3, "a-3"))
probeA.expectNoMsg(1.second)
}
"re-deliver lost messages after restart" in {
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd ! Req("a-2")
expectMsg(ReqAck)
probeA.expectMsg(Action(2, "a-2"))
snd ! Req("a-3")
snd ! Req("a-4")
expectMsg(ReqAck)
expectMsg(ReqAck)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// trigger restart
snd ! Boom
// and then re-delivered
probeA.expectMsg(Action(3, "a-3"))
snd ! Req("a-5")
expectMsg(ReqAck)
probeA.expectMsg(Action(5, "a-5"))
probeA.expectNoMsg(1.second)
}
"restore state from snapshot" in {
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd ! Req("a-2")
expectMsg(ReqAck)
probeA.expectMsg(Action(2, "a-2"))
snd ! Req("a-3")
snd ! Req("a-4")
snd ! SaveSnap
expectMsg(ReqAck)
expectMsg(ReqAck)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// trigger restart
snd ! Boom
// and then re-delivered
probeA.expectMsg(Action(3, "a-3"))
snd ! Req("a-5")
expectMsg(ReqAck)
probeA.expectMsg(Action(5, "a-5"))
probeA.expectNoMsg(1.second)
}
"warn about unconfirmed messages" in {
val probeA = TestProbe()
val probeB = TestProbe()
val destinations = Map("A" -> probeA.ref.path, "B" -> probeB.ref.path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 3, async = false, destinations), name)
snd ! Req("a-1")
snd ! Req("b-1")
snd ! Req("b-2")
expectMsg(ReqAck)
expectMsg(ReqAck)
expectMsg(ReqAck)
val unconfirmed = receiveWhile(3.seconds) {
case UnconfirmedWarning(unconfirmed) unconfirmed
}.flatten
unconfirmed.map(_.destination).toSet should be(Set(probeA.ref.path, probeB.ref.path))
unconfirmed.map(_.message).toSet should be(Set(Action(1, "a-1"), Action(2, "b-1"), Action(3, "b-2")))
system.stop(snd)
}
"re-deliver many lost messages" in {
val probeA = TestProbe()
val probeB = TestProbe()
val probeC = TestProbe()
val dstA = system.actorOf(destinationProps(probeA.ref), "destination-a")
val dstB = system.actorOf(destinationProps(probeB.ref), "destination-b")
val dstC = system.actorOf(destinationProps(probeC.ref), "destination-c")
val destinations = Map(
"A" -> system.actorOf(unreliableProps(2, dstA), "unreliable-a").path,
"B" -> system.actorOf(unreliableProps(5, dstB), "unreliable-b").path,
"C" -> system.actorOf(unreliableProps(3, dstC), "unreliable-c").path)
val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, async = true, destinations), name)
val N = 100
for (n 1 to N) {
snd ! Req("a-" + n)
}
for (n 1 to N) {
snd ! Req("b-" + n)
}
for (n 1 to N) {
snd ! Req("c-" + n)
}
val deliverWithin = 20.seconds
probeA.receiveN(N, deliverWithin).map { case a: Action a.payload }.toSet should be((1 to N).map(n "a-" + n).toSet)
probeB.receiveN(N, deliverWithin).map { case a: Action a.payload }.toSet should be((1 to N).map(n "b-" + n).toSet)
probeC.receiveN(N, deliverWithin).map { case a: Action a.payload }.toSet should be((1 to N).map(n "c-" + n).toSet)
}
}
}
class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class InmemAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("inmem", "AtLeastOnceDeliverySpec"))

View file

@ -5,14 +5,15 @@
package akka.persistence.serialization
import scala.collection.immutable
import com.typesafe.config._
import akka.actor._
import akka.persistence._
import akka.serialization._
import akka.testkit._
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
object SerializerSpecConfigs {
val customSerializers = ConfigFactory.parseString(
"""
@ -133,6 +134,35 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
deserialized should be(confirmation)
}
}
"given AtLeastOnceDeliverySnapshot" must {
"handle empty unconfirmed" in {
val unconfirmed = Vector.empty
val snap = AtLeastOnceDeliverySnapshot(13, unconfirmed)
val serializer = serialization.findSerializerFor(snap)
val bytes = serializer.toBinary(snap)
val deserialized = serializer.fromBinary(bytes, Some(classOf[AtLeastOnceDeliverySnapshot]))
deserialized should be(snap)
}
"handle a few unconfirmed" in {
val unconfirmed = Vector(
UnconfirmedDelivery(deliveryId = 1, destination = testActor.path, "a"),
UnconfirmedDelivery(deliveryId = 2, destination = testActor.path, "b"),
UnconfirmedDelivery(deliveryId = 3, destination = testActor.path, 42))
val snap = AtLeastOnceDeliverySnapshot(17, unconfirmed)
val serializer = serialization.findSerializerFor(snap)
val bytes = serializer.toBinary(snap)
val deserialized = serializer.fromBinary(bytes, Some(classOf[AtLeastOnceDeliverySnapshot]))
deserialized should be(snap)
}
}
}
}

View file

@ -5,6 +5,7 @@
package doc;
import akka.actor.AbstractActor;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
@ -14,6 +15,7 @@ import scala.Option;
import scala.PartialFunction;
import scala.concurrent.duration.Duration;
import scala.runtime.BoxedUnit;
import java.io.Serializable;
import java.util.concurrent.TimeUnit;
@ -170,6 +172,93 @@ public class LambdaPersistenceDocTest {
//#recovery-completed
};
static Object atLeastOnceExample = new Object() {
//#at-least-once-example
class Msg implements Serializable {
public final long deliveryId;
public final String s;
public Msg(long deliveryId, String s) {
this.deliveryId = deliveryId;
this.s = s;
}
}
class Confirm implements Serializable {
public final long deliveryId;
public Confirm(long deliveryId) {
this.deliveryId = deliveryId;
}
}
class MsgSent implements Serializable {
public final String s;
public MsgSent(String s) {
this.s = s;
}
}
class MsgConfirmed implements Serializable {
public final long deliveryId;
public MsgConfirmed(long deliveryId) {
this.deliveryId = deliveryId;
}
}
class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery {
private final ActorPath destination;
public MyPersistentActor(ActorPath destination) {
this.destination = destination;
}
@Override
public PartialFunction<Object, BoxedUnit> receiveCommand() {
return ReceiveBuilder.
match(String.class, s -> {
persist(new MsgSent(s), evt -> updateState(evt));
}).
match(Confirm.class, confirm -> {
persist(new MsgConfirmed(confirm.deliveryId), evt -> updateState(evt));
}).
build();
}
@Override
public PartialFunction<Object, BoxedUnit> receiveRecover() {
return ReceiveBuilder.
match(Object.class, evt -> updateState(evt)).build();
}
void updateState(Object event) {
if (event instanceof MsgSent) {
final MsgSent evt = (MsgSent) event;
deliver(destination, deliveryId -> new Msg(deliveryId, evt.s));
} else if (event instanceof MsgConfirmed) {
final MsgConfirmed evt = (MsgConfirmed) event;
confirmDelivery(evt.deliveryId);
}
}
}
class MyDestination extends AbstractActor {
public MyDestination() {
receive(ReceiveBuilder.
match(Msg.class, msg -> {
// ...
sender().tell(new Confirm(msg.deliveryId), self());
}).build()
);
}
}
//#at-least-once-example
};
static Object o3 = new Object() {
//#channel-example
class MyProcessor extends AbstractProcessor {