diff --git a/akka-docs/rst/general/message-delivery-reliability.rst b/akka-docs/rst/general/message-delivery-reliability.rst index fafb4dafcd..b45daa8d7f 100644 --- a/akka-docs/rst/general/message-delivery-reliability.rst +++ b/akka-docs/rst/general/message-delivery-reliability.rst @@ -287,13 +287,13 @@ delivery is an explicit ACK–RETRY protocol. In its simplest form this requires The third becomes necessary by virtue of the acknowledgements not being guaranteed to arrive either. An ACK-RETRY protocol with business-level acknowledgements is -supported by :ref:`channels` of the Akka Persistence module. Duplicates can be -detected by tracking the sequence numbers of messages received via channels. +supported by :ref:`at-least-once-delivery` of the Akka Persistence module. Duplicates can be +detected by tracking the identifiers of messages sent via :ref:`at-least-once-delivery`. Another way of implementing the third part would be to make processing the messages idempotent on the level of the business logic. Another example of implementing all three requirements is shown at -:ref:`reliable-proxy` (which is now superseded by :ref:`channels`). +:ref:`reliable-proxy` (which is now superseded by :ref:`at-least-once-delivery`). Event Sourcing -------------- diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 1a64c3110a..6c6344c4a4 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -4,14 +4,17 @@ package docs.persistence; +import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.UntypedActor; +import akka.japi.Function; import akka.japi.Procedure; import akka.persistence.*; import scala.Option; import scala.concurrent.duration.Duration; +import java.io.Serializable; import java.util.concurrent.TimeUnit; @@ -162,6 +165,103 @@ public class PersistenceDocTest { //#recovery-completed } }; + + static Object atLeastOnceExample = new Object() { + //#at-least-once-example + + class Msg implements Serializable { + public final long deliveryId; + public final String s; + + public Msg(long deliveryId, String s) { + this.deliveryId = deliveryId; + this.s = s; + } + } + + class Confirm implements Serializable { + public final long deliveryId; + + public Confirm(long deliveryId) { + this.deliveryId = deliveryId; + } + } + + + class MsgSent implements Serializable { + public final String s; + + public MsgSent(String s) { + this.s = s; + } + } + class MsgConfirmed implements Serializable { + public final long deliveryId; + + public MsgConfirmed(long deliveryId) { + this.deliveryId = deliveryId; + } + } + + class MyPersistentActor extends UntypedPersistentActorWithAtLeastOnceDelivery { + private final ActorPath destination; + + public MyPersistentActor(ActorPath destination) { + this.destination = destination; + } + + public void onReceiveCommand(Object message) { + if (message instanceof String) { + String s = (String) message; + persist(new MsgSent(s), new Procedure() { + public void apply(MsgSent evt) { + updateState(evt); + } + }); + } else if (message instanceof Confirm) { + Confirm confirm = (Confirm) message; + persist(new MsgConfirmed(confirm.deliveryId), new Procedure() { + public void apply(MsgConfirmed evt) { + updateState(evt); + } + }); + } else { + unhandled(message); + } + } + + public void onReceiveRecover(Object event) { + updateState(event); + } + + void updateState(Object event) { + if (event instanceof MsgSent) { + final MsgSent evt = (MsgSent) event; + deliver(destination, new Function() { + public Object apply(Long deliveryId) { + return new Msg(deliveryId, evt.s); + } + }); + } else if (event instanceof MsgConfirmed) { + final MsgConfirmed evt = (MsgConfirmed) event; + confirmDelivery(evt.deliveryId); + } + } + } + + class MyDestination extends UntypedActor { + public void onReceive(Object message) throws Exception { + if (message instanceof Msg) { + Msg msg = (Msg) message; + // ... + getSender().tell(new Confirm(msg.deliveryId), getSelf()); + } else { + unhandled(message); + } + } + } + //#at-least-once-example + }; static Object o3 = new Object() { //#channel-example diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index cca5b9f18b..c308bf466d 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -370,6 +370,79 @@ A persistent actor can delete individual snapshots by calling the ``deleteSnapsh timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should use the ``deleteSnapshots`` method. +.. _at-least-once-delivery-java-lambda: + +At-Least-Once Delivery +====================== + +To send messages with at-least-once delivery semantics to destinations you can extend the ``AbstractPersistentActorWithAtLeastOnceDelivery`` +class instead of ``AbstractPersistentActor`` on the sending side. It takes care of re-sending messages when they +have not been confirmed within a configurable timeout. + +.. note:: + + At-least-once delivery implies that original message send order is not always preserved + and the destination may receive duplicate messages. That means that the + semantics do not match those of a normal :class:`ActorRef` send operation: + + * it is not at-most-once delivery + + * message order for the same sender–receiver pair is not preserved due to + possible resends + + * after a crash and restart of the destination messages are still + delivered—to the new actor incarnation + + These semantics is similar to what an :class:`ActorPath` represents (see + :ref:`actor-lifecycle-scala`), therefore you need to supply a path and not a + reference when delivering messages. The messages are sent to the path with + an actor selection. + +Use the ``deliver`` method to send a message to a destination. Call the ``confirmDelivery`` method +when the destination has replied with a confirmation message. + +.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#at-least-once-example + +Correlation between ``deliver`` and ``confirmDelivery`` is performed with the ``deliveryId`` that is provided +as parameter to the ``deliveryIdToMessage`` function. The ``deliveryId`` is typically passed in the message to the +destination, which replies with a message containing the same ``deliveryId``. + +The ``deliveryId`` is a strictly monotonically increasing sequence number without gaps. The same sequence is +used for all destinations of the actor, i.e. when sending to multiple destinations the destinations will see +gaps in the sequence if no translation is performed. + +The ``AbstractPersistentActorWithAtLeastOnceDelivery`` class has a state consisting of unconfirmed messages and a +sequence number. It does not store this state itself. You must persist events corresponding to the +``deliver`` and ``confirmDelivery`` invocations from your ``PersistentActor`` so that the state can +be restored by calling the same methods during the recovery phase of the ``PersistentActor``. Sometimes +these events can be derived from other business level events, and sometimes you must create separate events. +During recovery calls to ``delivery`` will not send out the message, but it will be sent later +if no matching ``confirmDelivery`` was performed. + +Support for snapshots is provided by ``getDeliverySnapshot`` and ``setDeliverySnapshot``. +The ``AtLeastOnceDeliverySnapshot`` contains the full delivery state, including unconfirmed messages. +If you need a custom snapshot for other parts of the actor state you must also include the +``AtLeastOnceDeliverySnapshot``. It is serialized using protobuf with the ordinary Akka +serialization mechanism. It is easiest to include the bytes of the ``AtLeastOnceDeliverySnapshot`` +as a blob in your custom snapshot. + +The interval between redelivery attempts is defined by the ``redeliverInterval`` method. +The default value can be configured with the ``akka.persistence.at-least-once-delivery.redeliver-interval`` +configuration key. The method can be overridden by implementation classes to return non-default values. + +After a number of delivery attempts a ``AtLeastOnceDelivery.UnconfirmedWarning`` message +will be sent to ``self``. The re-sending will still continue, but you can choose to call +``confirmDelivery`` to cancel the re-sending. The number of delivery attempts before emitting the +warning is defined by the ``warnAfterNumberOfUnconfirmedAttempts`` method. The default value can be +configured with the ``akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts`` +configuration key. The method can be overridden by implementation classes to return non-default values. + +The ``AbstractPersistentActorWithAtLeastOnceDelivery`` class holds messages in memory until their successful delivery has been confirmed. +The limit of maximum number of unconfirmed messages that the actor is allowed to hold in memory +is defined by the ``maxUnconfirmedMessages`` method. If this limit is exceed the ``deliver`` method will +not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException``. +The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages`` +configuration key. The method can be overridden by implementation classes to return non-default values. Storage plugins =============== diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 453b761295..6bd49c195f 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -376,6 +376,80 @@ A persistent actor can delete individual snapshots by calling the ``deleteSnapsh timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should use the ``deleteSnapshots`` method. +.. _at-least-once-delivery-java: + +At-Least-Once Delivery +====================== + +To send messages with at-least-once delivery semantics to destinations you can extend the ``UntypedPersistentActorWithAtLeastOnceDelivery`` +class instead of ``UntypedPersistentActor`` on the sending side. It takes care of re-sending messages when they +have not been confirmed within a configurable timeout. + +.. note:: + + At-least-once delivery implies that original message send order is not always preserved + and the destination may receive duplicate messages. That means that the + semantics do not match those of a normal :class:`ActorRef` send operation: + + * it is not at-most-once delivery + + * message order for the same sender–receiver pair is not preserved due to + possible resends + + * after a crash and restart of the destination messages are still + delivered—to the new actor incarnation + + These semantics is similar to what an :class:`ActorPath` represents (see + :ref:`actor-lifecycle-scala`), therefore you need to supply a path and not a + reference when delivering messages. The messages are sent to the path with + an actor selection. + +Use the ``deliver`` method to send a message to a destination. Call the ``confirmDelivery`` method +when the destination has replied with a confirmation message. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#at-least-once-example + +Correlation between ``deliver`` and ``confirmDelivery`` is performed with the ``deliveryId`` that is provided +as parameter to the ``deliveryIdToMessage`` function. The ``deliveryId`` is typically passed in the message to the +destination, which replies with a message containing the same ``deliveryId``. + +The ``deliveryId`` is a strictly monotonically increasing sequence number without gaps. The same sequence is +used for all destinations of the actor, i.e. when sending to multiple destinations the destinations will see +gaps in the sequence if no translation is performed. + +The ``UntypedPersistentActorWithAtLeastOnceDelivery`` class has a state consisting of unconfirmed messages and a +sequence number. It does not store this state itself. You must persist events corresponding to the +``deliver`` and ``confirmDelivery`` invocations from your ``PersistentActor`` so that the state can +be restored by calling the same methods during the recovery phase of the ``PersistentActor``. Sometimes +these events can be derived from other business level events, and sometimes you must create separate events. +During recovery calls to ``delivery`` will not send out the message, but it will be sent later +if no matching ``confirmDelivery`` was performed. + +Support for snapshots is provided by ``getDeliverySnapshot`` and ``setDeliverySnapshot``. +The ``AtLeastOnceDeliverySnapshot`` contains the full delivery state, including unconfirmed messages. +If you need a custom snapshot for other parts of the actor state you must also include the +``AtLeastOnceDeliverySnapshot``. It is serialized using protobuf with the ordinary Akka +serialization mechanism. It is easiest to include the bytes of the ``AtLeastOnceDeliverySnapshot`` +as a blob in your custom snapshot. + +The interval between redelivery attempts is defined by the ``redeliverInterval`` method. +The default value can be configured with the ``akka.persistence.at-least-once-delivery.redeliver-interval`` +configuration key. The method can be overridden by implementation classes to return non-default values. + +After a number of delivery attempts a ``AtLeastOnceDelivery.UnconfirmedWarning`` message +will be sent to ``self``. The re-sending will still continue, but you can choose to call +``confirmDelivery`` to cancel the re-sending. The number of delivery attempts before emitting the +warning is defined by the ``warnAfterNumberOfUnconfirmedAttempts`` method. The default value can be +configured with the ``akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts`` +configuration key. The method can be overridden by implementation classes to return non-default values. + +The ``UntypedPersistentActorWithAtLeastOnceDelivery`` class holds messages in memory until their successful delivery has been confirmed. +The limit of maximum number of unconfirmed messages that the actor is allowed to hold in memory +is defined by the ``maxUnconfirmedMessages`` method. If this limit is exceed the ``deliver`` method will +not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException``. +The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages`` +configuration key. The method can be overridden by implementation classes to return non-default values. + Storage plugins =============== diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index 8d701da9a6..0e5bf8735d 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -9,7 +9,6 @@ import akka.persistence._ import scala.concurrent.duration._ import scala.language.postfixOps - trait PersistenceDocSpec { val config = """ @@ -133,6 +132,48 @@ trait PersistenceDocSpec { } } + new AnyRef { + //#at-least-once-example + import akka.actor.{ Actor, ActorPath, Props } + import akka.persistence.AtLeastOnceDelivery + + case class Msg(deliveryId: Long, s: String) + case class Confirm(deliveryId: Long) + + sealed trait Evt + case class MsgSent(s: String) extends Evt + case class MsgConfirmed(deliveryId: Long) extends Evt + + class MyPersistentActor(destination: ActorPath) + extends PersistentActor with AtLeastOnceDelivery { + + def receiveCommand: Receive = { + case s: String => persist(MsgSent(s))(updateState) + case Confirm(deliveryId) => persist(MsgConfirmed(deliveryId))(updateState) + } + + def receiveRecover: Receive = { + case evt: Evt => updateState(evt) + } + + def updateState(evt: Evt): Unit = evt match { + case MsgSent(s) => + deliver(destination, deliveryId => Msg(deliveryId, s)) + + case MsgConfirmed(deliveryId) => confirmDelivery(deliveryId) + } + } + + class MyDestination extends Actor { + def receive = { + case Msg(deliveryId, s) => + // ... + sender() ! Confirm(deliveryId) + } + } + //#at-least-once-example + } + new AnyRef { //#channel-example import akka.actor.{ Actor, Props } diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 9bc30a957c..8b4f832e92 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -409,6 +409,79 @@ A persistent actor can delete individual snapshots by calling the ``deleteSnapsh timestamp of a snapshot as argument. To bulk-delete snapshots matching ``SnapshotSelectionCriteria``, persistent actors should use the ``deleteSnapshots`` method. +.. _at-least-once-delivery: + +At-Least-Once Delivery +====================== + +To send messages with at-least-once delivery semantics to destinations you can add the ``AtLeastOnceDelivery`` +trait to your ``PersistentActor`` on the sending side. It takes care of re-sending messages when they +have not been confirmed within a configurable timeout. + +.. note:: + + At-least-once delivery implies that original message send order is not always preserved + and the destination may receive duplicate messages. That means that the + semantics do not match those of a normal :class:`ActorRef` send operation: + + * it is not at-most-once delivery + + * message order for the same sender–receiver pair is not preserved due to + possible resends + + * after a crash and restart of the destination messages are still + delivered—to the new actor incarnation + + These semantics is similar to what an :class:`ActorPath` represents (see + :ref:`actor-lifecycle-scala`), therefore you need to supply a path and not a + reference when delivering messages. The messages are sent to the path with + an actor selection. + +Use the ``deliver`` method to send a message to a destination. Call the ``confirmDelivery`` method +when the destination has replied with a confirmation message. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#at-least-once-example + +Correlation between ``deliver`` and ``confirmDelivery`` is performed with the ``deliveryId`` that is provided +as parameter to the ``deliveryIdToMessage`` function. The ``deliveryId`` is typically passed in the message to the +destination, which replies with a message containing the same ``deliveryId``. + +The ``deliveryId`` is a strictly monotonically increasing sequence number without gaps. The same sequence is +used for all destinations of the actor, i.e. when sending to multiple destinations the destinations will see +gaps in the sequence if no translation is performed. + +The ``AtLeastOnceDelivery`` trait has a state consisting of unconfirmed messages and a +sequence number. It does not store this state itself. You must persist events corresponding to the +``deliver`` and ``confirmDelivery`` invocations from your ``PersistentActor`` so that the state can +be restored by calling the same methods during the recovery phase of the ``PersistentActor``. Sometimes +these events can be derived from other business level events, and sometimes you must create separate events. +During recovery calls to ``delivery`` will not send out the message, but it will be sent later +if no matching ``confirmDelivery`` was performed. + +Support for snapshots is provided by ``getDeliverySnapshot`` and ``setDeliverySnapshot``. +The ``AtLeastOnceDeliverySnapshot`` contains the full delivery state, including unconfirmed messages. +If you need a custom snapshot for other parts of the actor state you must also include the +``AtLeastOnceDeliverySnapshot``. It is serialized using protobuf with the ordinary Akka +serialization mechanism. It is easiest to include the bytes of the ``AtLeastOnceDeliverySnapshot`` +as a blob in your custom snapshot. + +The interval between redelivery attempts is defined by the ``redeliverInterval`` method. +The default value can be configured with the ``akka.persistence.at-least-once-delivery.redeliver-interval`` +configuration key. The method can be overridden by implementation classes to return non-default values. + +After a number of delivery attempts a ``AtLeastOnceDelivery.UnconfirmedWarning`` message +will be sent to ``self``. The re-sending will still continue, but you can choose to call +``confirmDelivery`` to cancel the re-sending. The number of delivery attempts before emitting the +warning is defined by the ``warnAfterNumberOfUnconfirmedAttempts`` method. The default value can be +configured with the ``akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts`` +configuration key. The method can be overridden by implementation classes to return non-default values. + +The ``AtLeastOnceDelivery`` trait holds messages in memory until their successful delivery has been confirmed. +The limit of maximum number of unconfirmed messages that the actor is allowed to hold in memory +is defined by the ``maxUnconfirmedMessages`` method. If this limit is exceed the ``deliver`` method will +not accept more messages and it will throw ``AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException``. +The default value can be configured with the ``akka.persistence.at-least-once-delivery.max-unconfirmed-messages`` +configuration key. The method can be overridden by implementation classes to return non-default values. .. _storage-plugins: diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index 1d5169b9d6..3171786369 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -4582,6 +4582,1560 @@ public final class MessageFormats { // @@protoc_insertion_point(class_scope:DeliverMessage) } + public interface AtLeastOnceDeliverySnapshotOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required int64 currentDeliveryId = 1; + /** + * required int64 currentDeliveryId = 1; + */ + boolean hasCurrentDeliveryId(); + /** + * required int64 currentDeliveryId = 1; + */ + long getCurrentDeliveryId(); + + // repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + java.util.List + getUnconfirmedDeliveriesList(); + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery getUnconfirmedDeliveries(int index); + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + int getUnconfirmedDeliveriesCount(); + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + java.util.List + getUnconfirmedDeliveriesOrBuilderList(); + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDeliveryOrBuilder getUnconfirmedDeliveriesOrBuilder( + int index); + } + /** + * Protobuf type {@code AtLeastOnceDeliverySnapshot} + */ + public static final class AtLeastOnceDeliverySnapshot extends + com.google.protobuf.GeneratedMessage + implements AtLeastOnceDeliverySnapshotOrBuilder { + // Use AtLeastOnceDeliverySnapshot.newBuilder() to construct. + private AtLeastOnceDeliverySnapshot(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private AtLeastOnceDeliverySnapshot(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final AtLeastOnceDeliverySnapshot defaultInstance; + public static AtLeastOnceDeliverySnapshot getDefaultInstance() { + return defaultInstance; + } + + public AtLeastOnceDeliverySnapshot getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private AtLeastOnceDeliverySnapshot( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + currentDeliveryId_ = input.readInt64(); + break; + } + case 18: { + if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + unconfirmedDeliveries_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000002; + } + unconfirmedDeliveries_.add(input.readMessage(akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.PARSER, extensionRegistry)); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + unconfirmedDeliveries_ = java.util.Collections.unmodifiableList(unconfirmedDeliveries_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_AtLeastOnceDeliverySnapshot_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_AtLeastOnceDeliverySnapshot_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.class, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public AtLeastOnceDeliverySnapshot parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new AtLeastOnceDeliverySnapshot(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + public interface UnconfirmedDeliveryOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required int64 deliveryId = 1; + /** + * required int64 deliveryId = 1; + */ + boolean hasDeliveryId(); + /** + * required int64 deliveryId = 1; + */ + long getDeliveryId(); + + // required string destination = 2; + /** + * required string destination = 2; + */ + boolean hasDestination(); + /** + * required string destination = 2; + */ + java.lang.String getDestination(); + /** + * required string destination = 2; + */ + com.google.protobuf.ByteString + getDestinationBytes(); + + // required .PersistentPayload payload = 3; + /** + * required .PersistentPayload payload = 3; + */ + boolean hasPayload(); + /** + * required .PersistentPayload payload = 3; + */ + akka.persistence.serialization.MessageFormats.PersistentPayload getPayload(); + /** + * required .PersistentPayload payload = 3; + */ + akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getPayloadOrBuilder(); + } + /** + * Protobuf type {@code AtLeastOnceDeliverySnapshot.UnconfirmedDelivery} + */ + public static final class UnconfirmedDelivery extends + com.google.protobuf.GeneratedMessage + implements UnconfirmedDeliveryOrBuilder { + // Use UnconfirmedDelivery.newBuilder() to construct. + private UnconfirmedDelivery(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private UnconfirmedDelivery(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final UnconfirmedDelivery defaultInstance; + public static UnconfirmedDelivery getDefaultInstance() { + return defaultInstance; + } + + public UnconfirmedDelivery getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private UnconfirmedDelivery( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + deliveryId_ = input.readInt64(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + destination_ = input.readBytes(); + break; + } + case 26: { + akka.persistence.serialization.MessageFormats.PersistentPayload.Builder subBuilder = null; + if (((bitField0_ & 0x00000004) == 0x00000004)) { + subBuilder = payload_.toBuilder(); + } + payload_ = input.readMessage(akka.persistence.serialization.MessageFormats.PersistentPayload.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(payload_); + payload_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000004; + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.class, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public UnconfirmedDelivery parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new UnconfirmedDelivery(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required int64 deliveryId = 1; + public static final int DELIVERYID_FIELD_NUMBER = 1; + private long deliveryId_; + /** + * required int64 deliveryId = 1; + */ + public boolean hasDeliveryId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required int64 deliveryId = 1; + */ + public long getDeliveryId() { + return deliveryId_; + } + + // required string destination = 2; + public static final int DESTINATION_FIELD_NUMBER = 2; + private java.lang.Object destination_; + /** + * required string destination = 2; + */ + public boolean hasDestination() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required string destination = 2; + */ + public java.lang.String getDestination() { + java.lang.Object ref = destination_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + destination_ = s; + } + return s; + } + } + /** + * required string destination = 2; + */ + public com.google.protobuf.ByteString + getDestinationBytes() { + java.lang.Object ref = destination_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + destination_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // required .PersistentPayload payload = 3; + public static final int PAYLOAD_FIELD_NUMBER = 3; + private akka.persistence.serialization.MessageFormats.PersistentPayload payload_; + /** + * required .PersistentPayload payload = 3; + */ + public boolean hasPayload() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * required .PersistentPayload payload = 3; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayload getPayload() { + return payload_; + } + /** + * required .PersistentPayload payload = 3; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getPayloadOrBuilder() { + return payload_; + } + + private void initFields() { + deliveryId_ = 0L; + destination_ = ""; + payload_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasDeliveryId()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasDestination()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasPayload()) { + memoizedIsInitialized = 0; + return false; + } + if (!getPayload().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeInt64(1, deliveryId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, getDestinationBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeMessage(3, payload_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(1, deliveryId_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getDestinationBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(3, payload_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code AtLeastOnceDeliverySnapshot.UnconfirmedDelivery} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDeliveryOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.class, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.Builder.class); + } + + // Construct using akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getPayloadFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + deliveryId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); + destination_ = ""; + bitField0_ = (bitField0_ & ~0x00000002); + if (payloadBuilder_ == null) { + payload_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); + } else { + payloadBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.persistence.serialization.MessageFormats.internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_descriptor; + } + + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery getDefaultInstanceForType() { + return akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.getDefaultInstance(); + } + + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery build() { + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery buildPartial() { + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery result = new akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.deliveryId_ = deliveryId_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.destination_ = destination_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + if (payloadBuilder_ == null) { + result.payload_ = payload_; + } else { + result.payload_ = payloadBuilder_.build(); + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery) { + return mergeFrom((akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery other) { + if (other == akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.getDefaultInstance()) return this; + if (other.hasDeliveryId()) { + setDeliveryId(other.getDeliveryId()); + } + if (other.hasDestination()) { + bitField0_ |= 0x00000002; + destination_ = other.destination_; + onChanged(); + } + if (other.hasPayload()) { + mergePayload(other.getPayload()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasDeliveryId()) { + + return false; + } + if (!hasDestination()) { + + return false; + } + if (!hasPayload()) { + + return false; + } + if (!getPayload().isInitialized()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required int64 deliveryId = 1; + private long deliveryId_ ; + /** + * required int64 deliveryId = 1; + */ + public boolean hasDeliveryId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required int64 deliveryId = 1; + */ + public long getDeliveryId() { + return deliveryId_; + } + /** + * required int64 deliveryId = 1; + */ + public Builder setDeliveryId(long value) { + bitField0_ |= 0x00000001; + deliveryId_ = value; + onChanged(); + return this; + } + /** + * required int64 deliveryId = 1; + */ + public Builder clearDeliveryId() { + bitField0_ = (bitField0_ & ~0x00000001); + deliveryId_ = 0L; + onChanged(); + return this; + } + + // required string destination = 2; + private java.lang.Object destination_ = ""; + /** + * required string destination = 2; + */ + public boolean hasDestination() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required string destination = 2; + */ + public java.lang.String getDestination() { + java.lang.Object ref = destination_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + destination_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string destination = 2; + */ + public com.google.protobuf.ByteString + getDestinationBytes() { + java.lang.Object ref = destination_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + destination_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * required string destination = 2; + */ + public Builder setDestination( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + destination_ = value; + onChanged(); + return this; + } + /** + * required string destination = 2; + */ + public Builder clearDestination() { + bitField0_ = (bitField0_ & ~0x00000002); + destination_ = getDefaultInstance().getDestination(); + onChanged(); + return this; + } + /** + * required string destination = 2; + */ + public Builder setDestinationBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + destination_ = value; + onChanged(); + return this; + } + + // required .PersistentPayload payload = 3; + private akka.persistence.serialization.MessageFormats.PersistentPayload payload_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder> payloadBuilder_; + /** + * required .PersistentPayload payload = 3; + */ + public boolean hasPayload() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * required .PersistentPayload payload = 3; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayload getPayload() { + if (payloadBuilder_ == null) { + return payload_; + } else { + return payloadBuilder_.getMessage(); + } + } + /** + * required .PersistentPayload payload = 3; + */ + public Builder setPayload(akka.persistence.serialization.MessageFormats.PersistentPayload value) { + if (payloadBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + payload_ = value; + onChanged(); + } else { + payloadBuilder_.setMessage(value); + } + bitField0_ |= 0x00000004; + return this; + } + /** + * required .PersistentPayload payload = 3; + */ + public Builder setPayload( + akka.persistence.serialization.MessageFormats.PersistentPayload.Builder builderForValue) { + if (payloadBuilder_ == null) { + payload_ = builderForValue.build(); + onChanged(); + } else { + payloadBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000004; + return this; + } + /** + * required .PersistentPayload payload = 3; + */ + public Builder mergePayload(akka.persistence.serialization.MessageFormats.PersistentPayload value) { + if (payloadBuilder_ == null) { + if (((bitField0_ & 0x00000004) == 0x00000004) && + payload_ != akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance()) { + payload_ = + akka.persistence.serialization.MessageFormats.PersistentPayload.newBuilder(payload_).mergeFrom(value).buildPartial(); + } else { + payload_ = value; + } + onChanged(); + } else { + payloadBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000004; + return this; + } + /** + * required .PersistentPayload payload = 3; + */ + public Builder clearPayload() { + if (payloadBuilder_ == null) { + payload_ = akka.persistence.serialization.MessageFormats.PersistentPayload.getDefaultInstance(); + onChanged(); + } else { + payloadBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + /** + * required .PersistentPayload payload = 3; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayload.Builder getPayloadBuilder() { + bitField0_ |= 0x00000004; + onChanged(); + return getPayloadFieldBuilder().getBuilder(); + } + /** + * required .PersistentPayload payload = 3; + */ + public akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder getPayloadOrBuilder() { + if (payloadBuilder_ != null) { + return payloadBuilder_.getMessageOrBuilder(); + } else { + return payload_; + } + } + /** + * required .PersistentPayload payload = 3; + */ + private com.google.protobuf.SingleFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder> + getPayloadFieldBuilder() { + if (payloadBuilder_ == null) { + payloadBuilder_ = new com.google.protobuf.SingleFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentPayload, akka.persistence.serialization.MessageFormats.PersistentPayload.Builder, akka.persistence.serialization.MessageFormats.PersistentPayloadOrBuilder>( + payload_, + getParentForChildren(), + isClean()); + payload_ = null; + } + return payloadBuilder_; + } + + // @@protoc_insertion_point(builder_scope:AtLeastOnceDeliverySnapshot.UnconfirmedDelivery) + } + + static { + defaultInstance = new UnconfirmedDelivery(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:AtLeastOnceDeliverySnapshot.UnconfirmedDelivery) + } + + private int bitField0_; + // required int64 currentDeliveryId = 1; + public static final int CURRENTDELIVERYID_FIELD_NUMBER = 1; + private long currentDeliveryId_; + /** + * required int64 currentDeliveryId = 1; + */ + public boolean hasCurrentDeliveryId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required int64 currentDeliveryId = 1; + */ + public long getCurrentDeliveryId() { + return currentDeliveryId_; + } + + // repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + public static final int UNCONFIRMEDDELIVERIES_FIELD_NUMBER = 2; + private java.util.List unconfirmedDeliveries_; + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public java.util.List getUnconfirmedDeliveriesList() { + return unconfirmedDeliveries_; + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public java.util.List + getUnconfirmedDeliveriesOrBuilderList() { + return unconfirmedDeliveries_; + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public int getUnconfirmedDeliveriesCount() { + return unconfirmedDeliveries_.size(); + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery getUnconfirmedDeliveries(int index) { + return unconfirmedDeliveries_.get(index); + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDeliveryOrBuilder getUnconfirmedDeliveriesOrBuilder( + int index) { + return unconfirmedDeliveries_.get(index); + } + + private void initFields() { + currentDeliveryId_ = 0L; + unconfirmedDeliveries_ = java.util.Collections.emptyList(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasCurrentDeliveryId()) { + memoizedIsInitialized = 0; + return false; + } + for (int i = 0; i < getUnconfirmedDeliveriesCount(); i++) { + if (!getUnconfirmedDeliveries(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeInt64(1, currentDeliveryId_); + } + for (int i = 0; i < unconfirmedDeliveries_.size(); i++) { + output.writeMessage(2, unconfirmedDeliveries_.get(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(1, currentDeliveryId_); + } + for (int i = 0; i < unconfirmedDeliveries_.size(); i++) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(2, unconfirmedDeliveries_.get(i)); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code AtLeastOnceDeliverySnapshot} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshotOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_AtLeastOnceDeliverySnapshot_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_AtLeastOnceDeliverySnapshot_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.class, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.Builder.class); + } + + // Construct using akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getUnconfirmedDeliveriesFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + currentDeliveryId_ = 0L; + bitField0_ = (bitField0_ & ~0x00000001); + if (unconfirmedDeliveriesBuilder_ == null) { + unconfirmedDeliveries_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000002); + } else { + unconfirmedDeliveriesBuilder_.clear(); + } + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.persistence.serialization.MessageFormats.internal_static_AtLeastOnceDeliverySnapshot_descriptor; + } + + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot getDefaultInstanceForType() { + return akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.getDefaultInstance(); + } + + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot build() { + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot buildPartial() { + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot result = new akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.currentDeliveryId_ = currentDeliveryId_; + if (unconfirmedDeliveriesBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002)) { + unconfirmedDeliveries_ = java.util.Collections.unmodifiableList(unconfirmedDeliveries_); + bitField0_ = (bitField0_ & ~0x00000002); + } + result.unconfirmedDeliveries_ = unconfirmedDeliveries_; + } else { + result.unconfirmedDeliveries_ = unconfirmedDeliveriesBuilder_.build(); + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot) { + return mergeFrom((akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot other) { + if (other == akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.getDefaultInstance()) return this; + if (other.hasCurrentDeliveryId()) { + setCurrentDeliveryId(other.getCurrentDeliveryId()); + } + if (unconfirmedDeliveriesBuilder_ == null) { + if (!other.unconfirmedDeliveries_.isEmpty()) { + if (unconfirmedDeliveries_.isEmpty()) { + unconfirmedDeliveries_ = other.unconfirmedDeliveries_; + bitField0_ = (bitField0_ & ~0x00000002); + } else { + ensureUnconfirmedDeliveriesIsMutable(); + unconfirmedDeliveries_.addAll(other.unconfirmedDeliveries_); + } + onChanged(); + } + } else { + if (!other.unconfirmedDeliveries_.isEmpty()) { + if (unconfirmedDeliveriesBuilder_.isEmpty()) { + unconfirmedDeliveriesBuilder_.dispose(); + unconfirmedDeliveriesBuilder_ = null; + unconfirmedDeliveries_ = other.unconfirmedDeliveries_; + bitField0_ = (bitField0_ & ~0x00000002); + unconfirmedDeliveriesBuilder_ = + com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? + getUnconfirmedDeliveriesFieldBuilder() : null; + } else { + unconfirmedDeliveriesBuilder_.addAllMessages(other.unconfirmedDeliveries_); + } + } + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasCurrentDeliveryId()) { + + return false; + } + for (int i = 0; i < getUnconfirmedDeliveriesCount(); i++) { + if (!getUnconfirmedDeliveries(i).isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required int64 currentDeliveryId = 1; + private long currentDeliveryId_ ; + /** + * required int64 currentDeliveryId = 1; + */ + public boolean hasCurrentDeliveryId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required int64 currentDeliveryId = 1; + */ + public long getCurrentDeliveryId() { + return currentDeliveryId_; + } + /** + * required int64 currentDeliveryId = 1; + */ + public Builder setCurrentDeliveryId(long value) { + bitField0_ |= 0x00000001; + currentDeliveryId_ = value; + onChanged(); + return this; + } + /** + * required int64 currentDeliveryId = 1; + */ + public Builder clearCurrentDeliveryId() { + bitField0_ = (bitField0_ & ~0x00000001); + currentDeliveryId_ = 0L; + onChanged(); + return this; + } + + // repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + private java.util.List unconfirmedDeliveries_ = + java.util.Collections.emptyList(); + private void ensureUnconfirmedDeliveriesIsMutable() { + if (!((bitField0_ & 0x00000002) == 0x00000002)) { + unconfirmedDeliveries_ = new java.util.ArrayList(unconfirmedDeliveries_); + bitField0_ |= 0x00000002; + } + } + + private com.google.protobuf.RepeatedFieldBuilder< + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.Builder, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDeliveryOrBuilder> unconfirmedDeliveriesBuilder_; + + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public java.util.List getUnconfirmedDeliveriesList() { + if (unconfirmedDeliveriesBuilder_ == null) { + return java.util.Collections.unmodifiableList(unconfirmedDeliveries_); + } else { + return unconfirmedDeliveriesBuilder_.getMessageList(); + } + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public int getUnconfirmedDeliveriesCount() { + if (unconfirmedDeliveriesBuilder_ == null) { + return unconfirmedDeliveries_.size(); + } else { + return unconfirmedDeliveriesBuilder_.getCount(); + } + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery getUnconfirmedDeliveries(int index) { + if (unconfirmedDeliveriesBuilder_ == null) { + return unconfirmedDeliveries_.get(index); + } else { + return unconfirmedDeliveriesBuilder_.getMessage(index); + } + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public Builder setUnconfirmedDeliveries( + int index, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery value) { + if (unconfirmedDeliveriesBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureUnconfirmedDeliveriesIsMutable(); + unconfirmedDeliveries_.set(index, value); + onChanged(); + } else { + unconfirmedDeliveriesBuilder_.setMessage(index, value); + } + return this; + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public Builder setUnconfirmedDeliveries( + int index, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.Builder builderForValue) { + if (unconfirmedDeliveriesBuilder_ == null) { + ensureUnconfirmedDeliveriesIsMutable(); + unconfirmedDeliveries_.set(index, builderForValue.build()); + onChanged(); + } else { + unconfirmedDeliveriesBuilder_.setMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public Builder addUnconfirmedDeliveries(akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery value) { + if (unconfirmedDeliveriesBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureUnconfirmedDeliveriesIsMutable(); + unconfirmedDeliveries_.add(value); + onChanged(); + } else { + unconfirmedDeliveriesBuilder_.addMessage(value); + } + return this; + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public Builder addUnconfirmedDeliveries( + int index, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery value) { + if (unconfirmedDeliveriesBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureUnconfirmedDeliveriesIsMutable(); + unconfirmedDeliveries_.add(index, value); + onChanged(); + } else { + unconfirmedDeliveriesBuilder_.addMessage(index, value); + } + return this; + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public Builder addUnconfirmedDeliveries( + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.Builder builderForValue) { + if (unconfirmedDeliveriesBuilder_ == null) { + ensureUnconfirmedDeliveriesIsMutable(); + unconfirmedDeliveries_.add(builderForValue.build()); + onChanged(); + } else { + unconfirmedDeliveriesBuilder_.addMessage(builderForValue.build()); + } + return this; + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public Builder addUnconfirmedDeliveries( + int index, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.Builder builderForValue) { + if (unconfirmedDeliveriesBuilder_ == null) { + ensureUnconfirmedDeliveriesIsMutable(); + unconfirmedDeliveries_.add(index, builderForValue.build()); + onChanged(); + } else { + unconfirmedDeliveriesBuilder_.addMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public Builder addAllUnconfirmedDeliveries( + java.lang.Iterable values) { + if (unconfirmedDeliveriesBuilder_ == null) { + ensureUnconfirmedDeliveriesIsMutable(); + super.addAll(values, unconfirmedDeliveries_); + onChanged(); + } else { + unconfirmedDeliveriesBuilder_.addAllMessages(values); + } + return this; + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public Builder clearUnconfirmedDeliveries() { + if (unconfirmedDeliveriesBuilder_ == null) { + unconfirmedDeliveries_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000002); + onChanged(); + } else { + unconfirmedDeliveriesBuilder_.clear(); + } + return this; + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public Builder removeUnconfirmedDeliveries(int index) { + if (unconfirmedDeliveriesBuilder_ == null) { + ensureUnconfirmedDeliveriesIsMutable(); + unconfirmedDeliveries_.remove(index); + onChanged(); + } else { + unconfirmedDeliveriesBuilder_.remove(index); + } + return this; + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.Builder getUnconfirmedDeliveriesBuilder( + int index) { + return getUnconfirmedDeliveriesFieldBuilder().getBuilder(index); + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDeliveryOrBuilder getUnconfirmedDeliveriesOrBuilder( + int index) { + if (unconfirmedDeliveriesBuilder_ == null) { + return unconfirmedDeliveries_.get(index); } else { + return unconfirmedDeliveriesBuilder_.getMessageOrBuilder(index); + } + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public java.util.List + getUnconfirmedDeliveriesOrBuilderList() { + if (unconfirmedDeliveriesBuilder_ != null) { + return unconfirmedDeliveriesBuilder_.getMessageOrBuilderList(); + } else { + return java.util.Collections.unmodifiableList(unconfirmedDeliveries_); + } + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.Builder addUnconfirmedDeliveriesBuilder() { + return getUnconfirmedDeliveriesFieldBuilder().addBuilder( + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.getDefaultInstance()); + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.Builder addUnconfirmedDeliveriesBuilder( + int index) { + return getUnconfirmedDeliveriesFieldBuilder().addBuilder( + index, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.getDefaultInstance()); + } + /** + * repeated .AtLeastOnceDeliverySnapshot.UnconfirmedDelivery unconfirmedDeliveries = 2; + */ + public java.util.List + getUnconfirmedDeliveriesBuilderList() { + return getUnconfirmedDeliveriesFieldBuilder().getBuilderList(); + } + private com.google.protobuf.RepeatedFieldBuilder< + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.Builder, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDeliveryOrBuilder> + getUnconfirmedDeliveriesFieldBuilder() { + if (unconfirmedDeliveriesBuilder_ == null) { + unconfirmedDeliveriesBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< + akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.Builder, akka.persistence.serialization.MessageFormats.AtLeastOnceDeliverySnapshot.UnconfirmedDeliveryOrBuilder>( + unconfirmedDeliveries_, + ((bitField0_ & 0x00000002) == 0x00000002), + getParentForChildren(), + isClean()); + unconfirmedDeliveries_ = null; + } + return unconfirmedDeliveriesBuilder_; + } + + // @@protoc_insertion_point(builder_scope:AtLeastOnceDeliverySnapshot) + } + + static { + defaultInstance = new AtLeastOnceDeliverySnapshot(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:AtLeastOnceDeliverySnapshot) + } + private static com.google.protobuf.Descriptors.Descriptor internal_static_PersistentMessageBatch_descriptor; private static @@ -4607,6 +6161,16 @@ public final class MessageFormats { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_DeliverMessage_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_AtLeastOnceDeliverySnapshot_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_AtLeastOnceDeliverySnapshot_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -4632,7 +6196,13 @@ public final class MessageFormats { "\032\n\022deliverySequenceNr\030\004 \001(\003\022\017\n\007channel\030\005" + " \001(\t\"M\n\016DeliverMessage\022&\n\npersistent\030\001 \001" + "(\0132\022.PersistentMessage\022\023\n\013destination\030\002 " + - "\001(\tB\"\n\036akka.persistence.serializationH\001" + "\001(\t\"\356\001\n\033AtLeastOnceDeliverySnapshot\022\031\n\021c" + + "urrentDeliveryId\030\001 \002(\003\022O\n\025unconfirmedDel" + + "iveries\030\002 \003(\01320.AtLeastOnceDeliverySnaps" + + "hot.UnconfirmedDelivery\032c\n\023UnconfirmedDe", + "livery\022\022\n\ndeliveryId\030\001 \002(\003\022\023\n\013destinatio" + + "n\030\002 \002(\t\022#\n\007payload\030\003 \002(\0132\022.PersistentPay" + + "loadB\"\n\036akka.persistence.serializationH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -4669,6 +6239,18 @@ public final class MessageFormats { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_DeliverMessage_descriptor, new java.lang.String[] { "Persistent", "Destination", }); + internal_static_AtLeastOnceDeliverySnapshot_descriptor = + getDescriptor().getMessageTypes().get(5); + internal_static_AtLeastOnceDeliverySnapshot_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_AtLeastOnceDeliverySnapshot_descriptor, + new java.lang.String[] { "CurrentDeliveryId", "UnconfirmedDeliveries", }); + internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_descriptor = + internal_static_AtLeastOnceDeliverySnapshot_descriptor.getNestedTypes().get(0); + internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_AtLeastOnceDeliverySnapshot_UnconfirmedDelivery_descriptor, + new java.lang.String[] { "DeliveryId", "Destination", "Payload", }); return null; } }; diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index 3cc5db7276..3b28ef6a7f 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -39,4 +39,15 @@ message DeliveredMessage { message DeliverMessage { optional PersistentMessage persistent = 1; optional string destination = 2; -} \ No newline at end of file +} + +message AtLeastOnceDeliverySnapshot { + message UnconfirmedDelivery { + required int64 deliveryId = 1; + required string destination = 2; + required PersistentPayload payload = 3; + } + + required int64 currentDeliveryId = 1; + repeated UnconfirmedDelivery unconfirmedDeliveries = 2; +} diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 2f0cb3326a..c608c78387 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -145,6 +145,19 @@ akka { # -1 for no upper limit. auto-update-replay-max = -1 } + + at-least-once-delivery { + # Interval between redelivery attempts + redeliver-interval = 5s + + # After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning` + # message will be sent to the actor. + warn-after-number-of-unconfirmed-attempts = 5 + + # Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is + # allowed to hold in memory. + max-unconfirmed-messages = 100000 + } dispatchers { default-plugin-dispatcher { diff --git a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala new file mode 100644 index 0000000000..da73bf052e --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala @@ -0,0 +1,355 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + +import scala.annotation.tailrec +import scala.collection.breakOut +import scala.collection.immutable +import scala.concurrent.duration.FiniteDuration +import akka.actor.Actor +import akka.actor.ActorPath +import akka.persistence.serialization.Message + +object AtLeastOnceDelivery { + + /** + * Snapshot of current `AtLeastOnceDelivery` state. Can be retrieved with + * [[AtLeastOnceDelivery#getDeliverySnapshot]] and saved with [[PersistentActor#saveSnapshot]]. + * During recovery the snapshot received in [[SnapshotOffer]] should be set + * with [[AtLeastOnceDelivery.setDeliverySnapshot]]. + */ + @SerialVersionUID(1L) + case class AtLeastOnceDeliverySnapshot(currentDeliveryId: Long, unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]) + extends Message { + + /** + * Java API + */ + def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = { + import scala.collection.JavaConverters._ + unconfirmedDeliveries.asJava + } + + } + + /** + * @see [[AtLeastOnceDelivery#warnAfterNumberOfUnconfirmedAttempts]] + */ + @SerialVersionUID(1L) + case class UnconfirmedWarning(unconfirmedDeliveries: immutable.Seq[UnconfirmedDelivery]) { + /** + * Java API + */ + def getUnconfirmedDeliveries: java.util.List[UnconfirmedDelivery] = { + import scala.collection.JavaConverters._ + unconfirmedDeliveries.asJava + } + } + + /** + * Information about a message that has not been confirmed. Included in [[UnconfirmedWarning]] + * and [[AtLeastOnceDeliverySnapshot]]. + */ + case class UnconfirmedDelivery(deliveryId: Long, destination: ActorPath, message: Any) { + /** + * Java API + */ + def getMessage(): AnyRef = message.asInstanceOf[AnyRef] + } + + /** + * @see [[AtLeastOnceDelivery#maxUnconfirmedMessages]] + */ + class MaxUnconfirmedMessagesExceededException(message: String) extends RuntimeException(message) + + private object Internal { + case class Delivery(destination: ActorPath, message: Any, timestamp: Long, attempt: Int) + case object RedeliveryTick + } + +} + +/** + * Use this trait with your `PersistentActor` to send messages with at-least-once + * delivery semantics to destinations. It takes care of re-sending messages when they + * have not been confirmed within a configurable timeout. Use the [[#deliver]] method to + * send a message to a destination. Call the [[#confirmDelivery]] method when the destination + * has replied with a confirmation message. + * + * At-least-once delivery implies that original message send order is not always retained + * and the destination may receive duplicate messages due to possible resends. + * + * The interval between redelivery attempts can be defined by [[#redeliverInterval]]. + * After a number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message + * will be sent to `self`. The re-sending will still continue, but you can choose to call + * [[#confirmDelivery]] to cancel the re-sending. + * + * The `AtLeastOnceDelivery` trait has a state consisting of unconfirmed messages and a + * sequence number. It does not store this state itself. You must persist events corresponding + * to the `deliver` and `confirmDelivery` invocations from your `PersistentActor` so that the + * state can be restored by calling the same methods during the recovery phase of the + * `PersistentActor`. Sometimes these events can be derived from other business level events, + * and sometimes you must create separate events. During recovery calls to `delivery` + * will not send out the message, but it will be sent later if no matching `confirmDelivery` + * was performed. + * + * Support for snapshots is provided by [[#getDeliverySnapshot]] and [[#setDeliverySnapshot]]. + * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages. + * If you need a custom snapshot for other parts of the actor state you must also include the + * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka + * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot` + * as a blob in your custom snapshot. + */ +trait AtLeastOnceDelivery extends Processor { + // FIXME The reason for extending Processor instead of PersistentActor is + // the class hierarchy for UntypedPersistentActorWithAtLeastOnceDelivery + import AtLeastOnceDelivery._ + import AtLeastOnceDelivery.Internal._ + + /** + * Interval between redelivery attempts. + * + * The default value can be configured with the + * `akka.persistence.at-least-once-delivery.redeliver-interval` + * configuration key. This method can be overridden by implementation classes to return + * non-default values. + */ + def redeliverInterval: FiniteDuration = defaultRedeliverInterval + + private val defaultRedeliverInterval: FiniteDuration = + Persistence(context.system).settings.atLeastOnceDelivery.redeliverInterval + + /** + * After this number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message + * will be sent to `self`. The count is reset after a restart. + * + * The default value can be configured with the + * `akka.persistence.at-least-once-delivery.warn-after-number-of-unconfirmed-attempts` + * configuration key. This method can be overridden by implementation classes to return + * non-default values. + */ + def warnAfterNumberOfUnconfirmedAttempts: Int = defaultWarnAfterNumberOfUnconfirmedAttempts + + private val defaultWarnAfterNumberOfUnconfirmedAttempts: Int = + Persistence(context.system).settings.atLeastOnceDelivery.warnAfterNumberOfUnconfirmedAttempts + + /** + * Maximum number of unconfirmed messages that this actor is allowed to hold in memory. + * If this number is exceed [[#deliver]] will not accept more messages and it will throw + * [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]]. + * + * The default value can be configured with the + * `akka.persistence.at-least-once-delivery.max-unconfirmed-messages` + * configuration key. This method can be overridden by implementation classes to return + * non-default values. + */ + def maxUnconfirmedMessages: Int = defaultMaxUnconfirmedMessages + + private val defaultMaxUnconfirmedMessages: Int = + Persistence(context.system).settings.atLeastOnceDelivery.maxUnconfirmedMessages + + private val redeliverTask = { + import context.dispatcher + val interval = redeliverInterval / 2 + context.system.scheduler.schedule(interval, interval, self, RedeliveryTick) + } + + private var deliverySequenceNr = 0L + private var unconfirmed = immutable.SortedMap.empty[Long, Delivery] + + private def nextDeliverySequenceNr(): Long = { + deliverySequenceNr += 1 + deliverySequenceNr + } + + /** + * Scala API: Send the message created by the `deliveryIdToMessage` function to + * the `destination` actor. It will retry sending the message until + * the delivery is confirmed with [[#confirmDelivery]]. Correlation + * between `deliver` and `confirmDelivery` is performed with the + * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` + * function. The `deliveryId` is typically passed in the message to the + * destination, which replies with a message containing the same `deliveryId`. + * + * The `deliveryId` is a strictly monotonically increasing sequence number without + * gaps. The same sequence is used for all destinations of the actor, i.e. when sending + * to multiple destinations the destinations will see gaps in the sequence if no + * translation is performed. + * + * During recovery this method will not send out the message, but it will be sent + * later if no matching `confirmDelivery` was performed. + * + * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] + * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. + */ + def deliver(destination: ActorPath, deliveryIdToMessage: Long ⇒ Any): Unit = { + if (unconfirmed.size >= maxUnconfirmedMessages) + throw new MaxUnconfirmedMessagesExceededException( + s"Too many unconfirmed messages, maximum allowed is [$maxUnconfirmedMessages]") + + val deliveryId = nextDeliverySequenceNr() + val now = System.nanoTime() + val d = Delivery(destination, deliveryIdToMessage(deliveryId), now, attempt = 0) + if (recoveryRunning) + unconfirmed = unconfirmed.updated(deliveryId, d) + else + send(deliveryId, d, now) + } + + /** + * Call this method when a message has been confirmed by the destination, + * or to abort re-sending. + * @see [[#deliver]] + * @return `true` the first time the `deliveryId` is confirmed, i.e. `false` for duplicate confirm + */ + def confirmDelivery(deliveryId: Long): Boolean = { + if (unconfirmed.contains(deliveryId)) { + unconfirmed -= deliveryId + true + } else false + } + + /** + * Number of messages that have not been confirmed yet. + */ + def numberOfUnconfirmed: Int = unconfirmed.size + + private def redeliverOverdue(): Unit = { + val now = System.nanoTime() + val deadline = now - redeliverInterval.toNanos + var warnings = Vector.empty[UnconfirmedDelivery] + unconfirmed foreach { + case (deliveryId, delivery) ⇒ + if (delivery.timestamp <= deadline) { + send(deliveryId, delivery, now) + if (delivery.attempt == warnAfterNumberOfUnconfirmedAttempts) + warnings :+= UnconfirmedDelivery(deliveryId, delivery.destination, delivery.message) + } + } + if (warnings.nonEmpty) + self ! UnconfirmedWarning(warnings) + } + + private def send(deliveryId: Long, d: Delivery, timestamp: Long): Unit = { + context.actorSelection(d.destination) ! d.message + unconfirmed = unconfirmed.updated(deliveryId, d.copy(timestamp = timestamp, attempt = d.attempt + 1)) + } + + /** + * Full state of the `AtLeastOnceDelivery`. It can be saved with [[PersistentActor#saveSnapshot]]. + * During recovery the snapshot received in [[SnapshotOffer]] should be set + * with [[#setDeliverySnapshot]]. + * + * The `AtLeastOnceDeliverySnapshot` contains the full delivery state, including unconfirmed messages. + * If you need a custom snapshot for other parts of the actor state you must also include the + * `AtLeastOnceDeliverySnapshot`. It is serialized using protobuf with the ordinary Akka + * serialization mechanism. It is easiest to include the bytes of the `AtLeastOnceDeliverySnapshot` + * as a blob in your custom snapshot. + */ + def getDeliverySnapshot: AtLeastOnceDeliverySnapshot = + AtLeastOnceDeliverySnapshot(deliverySequenceNr, + unconfirmed.map { case (deliveryId, d) ⇒ UnconfirmedDelivery(deliveryId, d.destination, d.message) }(breakOut)) + + /** + * If snapshot from [[#getDeliverySnapshot]] was saved it will be received during recovery + * in a [[SnapshotOffer]] message and should be set with this method. + */ + def setDeliverySnapshot(snapshot: AtLeastOnceDeliverySnapshot): Unit = { + deliverySequenceNr = snapshot.currentDeliveryId + val now = System.nanoTime() + unconfirmed = snapshot.unconfirmedDeliveries.map(d ⇒ + d.deliveryId -> Delivery(d.destination, d.message, now, 0))(breakOut) + } + + /** + * INTERNAL API + */ + override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { + redeliverTask.cancel() + super.aroundPreRestart(reason, message) + } + + /** + * INTERNAL API + */ + override protected[akka] def aroundPostStop(): Unit = { + redeliverTask.cancel() + super.aroundPostStop() + } + + /** + * INTERNAL API + */ + override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = + message match { + case RedeliveryTick ⇒ redeliverOverdue() + case _ ⇒ super.aroundReceive(receive, message) + } +} + +/** + * Java API: Use this class instead of `UntypedPersistentActor` to send messages + * with at-least-once delivery semantics to destinations. + * Full documentation in [[AtLeastOnceDelivery]]. + * + * @see [[AtLeastOnceDelivery]] + */ +abstract class UntypedPersistentActorWithAtLeastOnceDelivery extends UntypedPersistentActor with AtLeastOnceDelivery { + /** + * Java API: Send the message created by the `deliveryIdToMessage` function to + * the `destination` actor. It will retry sending the message until + * the delivery is confirmed with [[#confirmDelivery]]. Correlation + * between `deliver` and `confirmDelivery` is performed with the + * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` + * function. The `deliveryId` is typically passed in the message to the + * destination, which replies with a message containing the same `deliveryId`. + * + * The `deliveryId` is a strictly monotonically increasing sequence number without + * gaps. The same sequence is used for all destinations, i.e. when sending to + * multiple destinations the destinations will see gaps in the sequence if no + * translation is performed. + * + * During recovery this method will not send out the message, but it will be sent + * later if no matching `confirmDelivery` was performed. + * + * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] + * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. + */ + def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = + super.deliver(destination, id ⇒ deliveryIdToMessage.apply(id)) +} + +/** + * Java API compatible with lambda expressions + * + * Use this class instead of `UntypedPersistentActor` to send messages + * with at-least-once delivery semantics to destinations. + * Full documentation in [[AtLeastOnceDelivery]]. + * + * @see [[AtLeastOnceDelivery]] + */ +abstract class AbstractPersistentActorWithAtLeastOnceDelivery extends AbstractPersistentActor with AtLeastOnceDelivery { + /** + * Java API: Send the message created by the `deliveryIdToMessage` function to + * the `destination` actor. It will retry sending the message until + * the delivery is confirmed with [[#confirmDelivery]]. Correlation + * between `deliver` and `confirmDelivery` is performed with the + * `deliveryId` that is provided as parameter to the `deliveryIdToMessage` + * function. The `deliveryId` is typically passed in the message to the + * destination, which replies with a message containing the same `deliveryId`. + * + * The `deliveryId` is a strictly monotonically increasing sequence number without + * gaps. The same sequence is used for all destinations, i.e. when sending to + * multiple destinations the destinations will see gaps in the sequence if no + * translation is performed. + * + * During recovery this method will not send out the message, but it will be sent + * later if no matching `confirmDelivery` was performed. + * + * This method will throw [[AtLeastOnceDelivery.MaxUnconfirmedMessagesExceededException]] + * if [[numberOfUnconfirmed]] is greater than or equal to [[#maxUnconfirmedMessages]]. + */ + def deliver(destination: ActorPath, deliveryIdToMessage: akka.japi.Function[java.lang.Long, Object]): Unit = + super.deliver(destination, id ⇒ deliveryIdToMessage.apply(id)) +} diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index a13df08bfc..8c8a40d2a6 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -345,7 +345,7 @@ private[persistence] trait Eventsourced extends ProcessorImpl { /** * INTERNAL API. */ - final override protected[akka] def aroundReceive(receive: Receive, message: Any) { + override protected[akka] def aroundReceive(receive: Receive, message: Any) { currentState.aroundReceive(receive, message) } @@ -686,4 +686,4 @@ abstract class UntypedEventsourcedProcessor extends UntypedPersistentActor { @deprecated("AbstractEventsourcedProcessor will be removed in 2.4.x, instead extend the API equivalent `akka.persistence.PersistentProcessor`", since = "2.3.4") abstract class AbstractEventsourcedProcessor extends AbstractPersistentActor { override def persistenceId: String = processorId -} \ No newline at end of file +} diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 048e642d86..245dc2f68b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -42,6 +42,18 @@ final class PersistenceSettings(config: Config) { if (v < 0) Long.MaxValue else v } + object atLeastOnceDelivery { + + val redeliverInterval: FiniteDuration = + config.getMillisDuration("at-least-once-delivery.redeliver-interval") + + val warnAfterNumberOfUnconfirmedAttempts: Int = + config.getInt("at-least-once-delivery.warn-after-number-of-unconfirmed-attempts") + + val maxUnconfirmedMessages: Int = + config.getInt("at-least-once-delivery.max-unconfirmed-messages") + } + /** * INTERNAL API. * diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 030539aa1a..e28781627f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -245,21 +245,21 @@ private[akka] trait ProcessorImpl extends Actor with Recovery { /** * INTERNAL API. */ - final override protected[akka] def aroundPreStart(): Unit = { + override protected[akka] def aroundPreStart(): Unit = { try preStart() finally super.preStart() } /** * INTERNAL API. */ - final override protected[akka] def aroundPostStop(): Unit = { + override protected[akka] def aroundPostStop(): Unit = { try unstashAll(unstashFilterPredicate) finally postStop() } /** * INTERNAL API. */ - final override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { + override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { try { receiverStash.prepend(processorBatch.map(p ⇒ Envelope(p, p.sender, context.system))) receiverStash.unstashAll() diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 36a8d30863..21ca6e583b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -5,14 +5,15 @@ package akka.persistence.serialization import scala.language.existentials - import com.google.protobuf._ - import akka.actor.{ ActorPath, ExtendedActorSystem } import akka.japi.Util.immutableSeq import akka.persistence._ import akka.persistence.serialization.MessageFormats._ import akka.serialization._ +import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot ⇒ AtLeastOnceDeliverySnap } +import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery +import scala.collection.immutable.VectorBuilder /** * Marker trait for all protobuf-serializable messages in `akka.persistence`. @@ -32,6 +33,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { val DeliveredByTransientChannelClass = classOf[DeliveredByChannel] val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistentChannel] val DeliverClass = classOf[Deliver] + val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap] def identifier: Int = 7 def includeManifest: Boolean = true @@ -52,6 +54,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { case c: DeliveredByChannel ⇒ deliveredMessageBuilder(c).build().toByteArray case c: DeliveredByPersistentChannel ⇒ deliveredMessageBuilder(c).build().toByteArray case d: Deliver ⇒ deliverMessageBuilder(d).build.toByteArray + case a: AtLeastOnceDeliverySnap ⇒ atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") } @@ -69,6 +72,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { case DeliveredByTransientChannelClass ⇒ delivered(DeliveredMessage.parseFrom(bytes)) case DeliveredByPersistentChannelClass ⇒ delivered(DeliveredMessage.parseFrom(bytes)) case DeliverClass ⇒ deliver(DeliverMessage.parseFrom(bytes)) + case AtLeastOnceDeliverySnapshotClass ⇒ atLeastOnceDeliverySnapshot(AtLeastOnceDeliverySnapshot.parseFrom(bytes)) case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}") } } @@ -84,6 +88,33 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { builder } + def atLeastOnceDeliverySnapshotBuilder(snap: AtLeastOnceDeliverySnap): AtLeastOnceDeliverySnapshot.Builder = { + val builder = AtLeastOnceDeliverySnapshot.newBuilder + builder.setCurrentDeliveryId(snap.currentDeliveryId) + snap.unconfirmedDeliveries.foreach { unconfirmed ⇒ + val unconfirmedBuilder = + AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.newBuilder. + setDeliveryId(unconfirmed.deliveryId). + setDestination(unconfirmed.destination.toString). + setPayload(persistentPayloadBuilder(unconfirmed.message.asInstanceOf[AnyRef])) + builder.addUnconfirmedDeliveries(unconfirmedBuilder) + } + builder + } + + def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnap = { + import scala.collection.JavaConverters._ + val unconfirmedDeliveries = new VectorBuilder[UnconfirmedDelivery]() + atLeastOnceDeliverySnapshot.getUnconfirmedDeliveriesList().iterator().asScala foreach { next ⇒ + unconfirmedDeliveries += UnconfirmedDelivery(next.getDeliveryId, ActorPath.fromString(next.getDestination), + payload(next.getPayload)) + } + + AtLeastOnceDeliverySnap( + atLeastOnceDeliverySnapshot.getCurrentDeliveryId, + unconfirmedDeliveries.result()) + } + private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = { val builder = PersistentMessageBatch.newBuilder persistentBatch.batch. diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala new file mode 100644 index 0000000000..c446738b8d --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliveryFailureSpec.scala @@ -0,0 +1,177 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.persistence + +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.language.postfixOps + +import com.typesafe.config.ConfigFactory + +import akka.actor._ +import akka.testkit._ + +object AtLeastOnceDeliveryFailureSpec { + val config = ConfigFactory.parseString( + s""" + akka.persistence.sender.chaos.live-processing-failure-rate = 0.3 + akka.persistence.sender.chaos.replay-processing-failure-rate = 0.1 + akka.persistence.destination.chaos.confirm-failure-rate = 0.3 + akka.persistence.journal.plugin = "akka.persistence.journal.chaos" + akka.persistence.journal.chaos.write-failure-rate = 0.3 + akka.persistence.journal.chaos.confirm-failure-rate = 0.2 + akka.persistence.journal.chaos.delete-failure-rate = 0.3 + akka.persistence.journal.chaos.replay-failure-rate = 0.25 + akka.persistence.journal.chaos.read-highest-failure-rate = 0.1 + akka.persistence.journal.chaos.class = akka.persistence.journal.chaos.ChaosJournal + akka.persistence.snapshot-store.local.dir = "target/snapshots-at-least-once-delivery-failure-spec/" + """) + + val numMessages = 10 + + case object Start + case class Done(ints: Vector[Int]) + + case class ProcessingFailure(i: Int) + case class JournalingFailure(i: Int) + + case class Msg(deliveryId: Long, i: Int) + case class Confirm(deliveryId: Long, i: Int) + + sealed trait Evt + case class MsgSent(i: Int) extends Evt + case class MsgConfirmed(deliveryId: Long, i: Int) extends Evt + + trait ChaosSupport { this: Actor ⇒ + def random = ThreadLocalRandom.current + + def probe: ActorRef + + var state = Vector.empty[Int] + + def contains(i: Int): Boolean = + state.contains(i) + + def add(i: Int): Unit = { + state :+= i + if (state.length == numMessages) probe ! Done(state) + } + + def shouldFail(rate: Double) = + random.nextDouble() < rate + } + + class ChaosSender(destination: ActorRef, val probe: ActorRef) extends PersistentActor with ChaosSupport with ActorLogging with AtLeastOnceDelivery { + val config = context.system.settings.config.getConfig("akka.persistence.sender.chaos") + val liveProcessingFailureRate = config.getDouble("live-processing-failure-rate") + val replayProcessingFailureRate = config.getDouble("replay-processing-failure-rate") + + override def redeliverInterval = 500.milliseconds + + override def processorId = "chaosSender" + + def receiveCommand: Receive = { + case i: Int ⇒ + val failureRate = if (recoveryRunning) replayProcessingFailureRate else liveProcessingFailureRate + if (contains(i)) { + log.debug(debugMessage(s"ignored duplicate ${i}")) + } else { + persist(MsgSent(i)) { evt ⇒ + updateState(evt) + if (shouldFail(failureRate)) + throw new TestException(debugMessage(s"failed at payload ${i}")) + else + log.debug(debugMessage(s"processed payload ${i}")) + } + + } + + case Confirm(deliveryId, i) ⇒ persist(MsgConfirmed(deliveryId, i))(updateState) + + case PersistenceFailure(MsgSent(i), _, _) ⇒ + // inform sender about journaling failure so that it can resend + sender() ! JournalingFailure(i) + + case PersistenceFailure(MsgConfirmed(_, i), _, _) ⇒ + // ok, will be redelivered + } + + def receiveRecover: Receive = { + case evt: Evt ⇒ updateState(evt) + case RecoveryFailure(_) ⇒ + // journal failed during recovery, throw exception to re-recover processor + throw new TestException(debugMessage("recovery failed")) + } + + def updateState(evt: Evt): Unit = evt match { + case MsgSent(i) ⇒ + add(i) + deliver(destination.path, deliveryId ⇒ Msg(deliveryId, i)) + + case MsgConfirmed(deliveryId, i) ⇒ + confirmDelivery(deliveryId) + } + + private def debugMessage(msg: String): String = + s"[sender] ${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${state.sorted})" + } + + class ChaosDestination(val probe: ActorRef) extends Actor with ChaosSupport with ActorLogging { + val config = context.system.settings.config.getConfig("akka.persistence.destination.chaos") + val confirmFailureRate = config.getDouble("confirm-failure-rate") + + def receive = { + case m @ Msg(deliveryId, i) ⇒ + if (shouldFail(confirmFailureRate)) { + log.error(debugMessage("confirm message failed", m)) + } else if (contains(i)) { + log.debug(debugMessage("ignored duplicate", m)) + sender() ! Confirm(deliveryId, i) + } else { + add(i) + sender() ! Confirm(deliveryId, i) + log.debug(debugMessage("received and confirmed message", m)) + } + } + + private def debugMessage(msg: String, m: Msg): String = + s"[destination] ${msg} (message = $m)" + } + + class ChaosApp(probe: ActorRef) extends Actor with ActorLogging { + val destination = context.actorOf(Props(classOf[ChaosDestination], probe), "destination") + val snd = context.actorOf(Props(classOf[ChaosSender], destination, probe), "sender") + + def receive = { + case Start ⇒ 1 to numMessages foreach (snd ! _) + case ProcessingFailure(i) ⇒ + snd ! i + log.debug(s"resent ${i} after processing failure") + case JournalingFailure(i) ⇒ + snd ! i + log.debug(s"resent ${i} after journaling failure") + } + } +} + +class AtLeastOnceDeliveryFailureSpec extends AkkaSpec(AtLeastOnceDeliveryFailureSpec.config) with Cleanup with ImplicitSender { + import AtLeastOnceDeliveryFailureSpec._ + + "AtLeastOnceDelivery" must { + "tolerate and recover from random failures" in { + system.actorOf(Props(classOf[ChaosApp], testActor), "chaosApp") ! Start + expectDone() // by sender + expectDone() // by destination + + system.actorOf(Props(classOf[ChaosApp], testActor), "chaosApp2") // recovery of new instance should have same outcome + expectDone() // by sender + // destination doesn't receive messages again because all have been confirmed already + } + } + + def expectDone() = within(numMessages.seconds) { + expectMsgType[Done].ints.sorted should be(1 to numMessages toVector) + } +} diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala new file mode 100644 index 0000000000..71f8910c5a --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala @@ -0,0 +1,293 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace +import com.typesafe.config._ +import akka.actor._ +import akka.testkit._ +import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot +import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning +import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning + +object AtLeastOnceDeliverySpec { + + case class Req(payload: String) + case object ReqAck + case object InvalidReq + + sealed trait Evt + case class AcceptedReq(payload: String, destination: ActorPath) extends Evt + case class ReqDone(id: Long) extends Evt + + case class Action(id: Long, payload: String) + case class ActionAck(id: Long) + case object Boom + case object SaveSnap + case class Snap(deliverySnapshot: AtLeastOnceDeliverySnapshot) // typically includes some user data as well + + def senderProps(testActor: ActorRef, name: String, + redeliverInterval: FiniteDuration, warnAfterNumberOfUnconfirmedAttempts: Int, + async: Boolean, destinations: Map[String, ActorPath]): Props = + Props(new Sender(testActor, name, redeliverInterval, warnAfterNumberOfUnconfirmedAttempts, async, destinations)) + + class Sender(testActor: ActorRef, + name: String, + override val redeliverInterval: FiniteDuration, + override val warnAfterNumberOfUnconfirmedAttempts: Int, + async: Boolean, + destinations: Map[String, ActorPath]) + extends PersistentActor with AtLeastOnceDelivery with ActorLogging { + + override def processorId: String = name + + def updateState(evt: Evt): Unit = evt match { + case AcceptedReq(payload, destination) ⇒ + deliver(destination, deliveryId ⇒ Action(deliveryId, payload)) + case ReqDone(id) ⇒ + confirmDelivery(id) + } + + val receiveCommand: Receive = { + case Req(payload) ⇒ + if (payload.isEmpty) + sender() ! InvalidReq + else { + val destination = destinations(payload.take(1).toUpperCase) + if (async) + persistAsync(AcceptedReq(payload, destination)) { evt ⇒ + updateState(evt) + sender() ! ReqAck + } + else + persist(AcceptedReq(payload, destination)) { evt ⇒ + updateState(evt) + sender() ! ReqAck + } + } + + case ActionAck(id) ⇒ + log.debug("Sender got ack {}", id) + if (confirmDelivery(id)) + if (async) + persistAsync(ReqDone(id)) { evt ⇒ updateState(evt) } + else + persist(ReqDone(id)) { evt ⇒ updateState(evt) } + + case Boom ⇒ + throw new RuntimeException("boom") with NoStackTrace + + case SaveSnap ⇒ + saveSnapshot(Snap(getDeliverySnapshot)) + + case w: UnconfirmedWarning ⇒ + testActor ! w + + } + + def receiveRecover: Receive = { + case evt: Evt ⇒ updateState(evt) + case SnapshotOffer(_, Snap(deliverySnapshot)) ⇒ + setDeliverySnapshot(deliverySnapshot) + + } + } + + def destinationProps(testActor: ActorRef): Props = + Props(new Destination(testActor)) + + class Destination(testActor: ActorRef) extends Actor with ActorLogging { + + var allReceived = Set.empty[Long] + + def receive = { + case a @ Action(id, payload) ⇒ + // discard duplicates (naive impl) + if (!allReceived.contains(id)) { + log.debug("Destination got {}, all count {}", a, allReceived.size + 1) + testActor ! a + allReceived += id + } + sender() ! ActionAck(id) + } + } + + def unreliableProps(dropMod: Int, target: ActorRef): Props = + Props(new Unreliable(dropMod, target)) + + class Unreliable(dropMod: Int, target: ActorRef) extends Actor with ActorLogging { + var count = 0 + def receive = { + case msg ⇒ + count += 1 + if (count % dropMod != 0) { + log.debug("Pass msg {} count {}", msg, count) + target forward msg + } else { + log.debug("Drop msg {} count {}", msg, count) + } + } + } + +} + +abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { + import AtLeastOnceDeliverySpec._ + + "AtLeastOnceDelivery" must { + "deliver messages in order when nothing is lost" in { + val probeA = TestProbe() + val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path) + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name) + snd ! Req("a") + expectMsg(ReqAck) + probeA.expectMsg(Action(1, "a")) + probeA.expectNoMsg(1.second) + } + + "re-deliver lost messages" in { + val probeA = TestProbe() + val dst = system.actorOf(destinationProps(probeA.ref)) + val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name) + snd ! Req("a-1") + expectMsg(ReqAck) + probeA.expectMsg(Action(1, "a-1")) + + snd ! Req("a-2") + expectMsg(ReqAck) + probeA.expectMsg(Action(2, "a-2")) + + snd ! Req("a-3") + snd ! Req("a-4") + expectMsg(ReqAck) + expectMsg(ReqAck) + // a-3 was lost + probeA.expectMsg(Action(4, "a-4")) + // and then re-delivered + probeA.expectMsg(Action(3, "a-3")) + probeA.expectNoMsg(1.second) + } + + "re-deliver lost messages after restart" in { + val probeA = TestProbe() + val dst = system.actorOf(destinationProps(probeA.ref)) + val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name) + snd ! Req("a-1") + expectMsg(ReqAck) + probeA.expectMsg(Action(1, "a-1")) + + snd ! Req("a-2") + expectMsg(ReqAck) + probeA.expectMsg(Action(2, "a-2")) + + snd ! Req("a-3") + snd ! Req("a-4") + expectMsg(ReqAck) + expectMsg(ReqAck) + // a-3 was lost + probeA.expectMsg(Action(4, "a-4")) + + // trigger restart + snd ! Boom + + // and then re-delivered + probeA.expectMsg(Action(3, "a-3")) + + snd ! Req("a-5") + expectMsg(ReqAck) + probeA.expectMsg(Action(5, "a-5")) + + probeA.expectNoMsg(1.second) + } + + "restore state from snapshot" in { + val probeA = TestProbe() + val dst = system.actorOf(destinationProps(probeA.ref)) + val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name) + snd ! Req("a-1") + expectMsg(ReqAck) + probeA.expectMsg(Action(1, "a-1")) + + snd ! Req("a-2") + expectMsg(ReqAck) + probeA.expectMsg(Action(2, "a-2")) + + snd ! Req("a-3") + snd ! Req("a-4") + snd ! SaveSnap + expectMsg(ReqAck) + expectMsg(ReqAck) + // a-3 was lost + probeA.expectMsg(Action(4, "a-4")) + + // trigger restart + snd ! Boom + + // and then re-delivered + probeA.expectMsg(Action(3, "a-3")) + + snd ! Req("a-5") + expectMsg(ReqAck) + probeA.expectMsg(Action(5, "a-5")) + + probeA.expectNoMsg(1.second) + } + + "warn about unconfirmed messages" in { + val probeA = TestProbe() + val probeB = TestProbe() + val destinations = Map("A" -> probeA.ref.path, "B" -> probeB.ref.path) + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 3, async = false, destinations), name) + snd ! Req("a-1") + snd ! Req("b-1") + snd ! Req("b-2") + expectMsg(ReqAck) + expectMsg(ReqAck) + expectMsg(ReqAck) + val unconfirmed = receiveWhile(3.seconds) { + case UnconfirmedWarning(unconfirmed) ⇒ unconfirmed + }.flatten + unconfirmed.map(_.destination).toSet should be(Set(probeA.ref.path, probeB.ref.path)) + unconfirmed.map(_.message).toSet should be(Set(Action(1, "a-1"), Action(2, "b-1"), Action(3, "b-2"))) + system.stop(snd) + } + + "re-deliver many lost messages" in { + val probeA = TestProbe() + val probeB = TestProbe() + val probeC = TestProbe() + val dstA = system.actorOf(destinationProps(probeA.ref), "destination-a") + val dstB = system.actorOf(destinationProps(probeB.ref), "destination-b") + val dstC = system.actorOf(destinationProps(probeC.ref), "destination-c") + val destinations = Map( + "A" -> system.actorOf(unreliableProps(2, dstA), "unreliable-a").path, + "B" -> system.actorOf(unreliableProps(5, dstB), "unreliable-b").path, + "C" -> system.actorOf(unreliableProps(3, dstC), "unreliable-c").path) + val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, async = true, destinations), name) + val N = 100 + for (n ← 1 to N) { + snd ! Req("a-" + n) + } + for (n ← 1 to N) { + snd ! Req("b-" + n) + } + for (n ← 1 to N) { + snd ! Req("c-" + n) + } + val deliverWithin = 20.seconds + probeA.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should be((1 to N).map(n ⇒ "a-" + n).toSet) + probeB.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should be((1 to N).map(n ⇒ "b-" + n).toSet) + probeC.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should be((1 to N).map(n ⇒ "c-" + n).toSet) + } + + } +} + +class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec")) +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class InmemAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("inmem", "AtLeastOnceDeliverySpec")) diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 91f5797f45..e9efe922eb 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -5,14 +5,15 @@ package akka.persistence.serialization import scala.collection.immutable - import com.typesafe.config._ - import akka.actor._ import akka.persistence._ import akka.serialization._ import akka.testkit._ +import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot +import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery + object SerializerSpecConfigs { val customSerializers = ConfigFactory.parseString( """ @@ -133,6 +134,35 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { deserialized should be(confirmation) } } + + "given AtLeastOnceDeliverySnapshot" must { + "handle empty unconfirmed" in { + val unconfirmed = Vector.empty + val snap = AtLeastOnceDeliverySnapshot(13, unconfirmed) + val serializer = serialization.findSerializerFor(snap) + + val bytes = serializer.toBinary(snap) + val deserialized = serializer.fromBinary(bytes, Some(classOf[AtLeastOnceDeliverySnapshot])) + + deserialized should be(snap) + } + + "handle a few unconfirmed" in { + val unconfirmed = Vector( + UnconfirmedDelivery(deliveryId = 1, destination = testActor.path, "a"), + UnconfirmedDelivery(deliveryId = 2, destination = testActor.path, "b"), + UnconfirmedDelivery(deliveryId = 3, destination = testActor.path, 42)) + val snap = AtLeastOnceDeliverySnapshot(17, unconfirmed) + val serializer = serialization.findSerializerFor(snap) + + val bytes = serializer.toBinary(snap) + val deserialized = serializer.fromBinary(bytes, Some(classOf[AtLeastOnceDeliverySnapshot])) + + deserialized should be(snap) + } + + } + } } diff --git a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java index 4486a7fadc..13391ae8a3 100644 --- a/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java +++ b/akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java @@ -5,6 +5,7 @@ package doc; import akka.actor.AbstractActor; +import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; @@ -14,6 +15,7 @@ import scala.Option; import scala.PartialFunction; import scala.concurrent.duration.Duration; import scala.runtime.BoxedUnit; +import java.io.Serializable; import java.util.concurrent.TimeUnit; @@ -170,6 +172,93 @@ public class LambdaPersistenceDocTest { //#recovery-completed }; + static Object atLeastOnceExample = new Object() { + //#at-least-once-example + + class Msg implements Serializable { + public final long deliveryId; + public final String s; + + public Msg(long deliveryId, String s) { + this.deliveryId = deliveryId; + this.s = s; + } + } + + class Confirm implements Serializable { + public final long deliveryId; + + public Confirm(long deliveryId) { + this.deliveryId = deliveryId; + } + } + + + class MsgSent implements Serializable { + public final String s; + + public MsgSent(String s) { + this.s = s; + } + } + class MsgConfirmed implements Serializable { + public final long deliveryId; + + public MsgConfirmed(long deliveryId) { + this.deliveryId = deliveryId; + } + } + + class MyPersistentActor extends AbstractPersistentActorWithAtLeastOnceDelivery { + private final ActorPath destination; + + public MyPersistentActor(ActorPath destination) { + this.destination = destination; + } + + @Override + public PartialFunction receiveCommand() { + return ReceiveBuilder. + match(String.class, s -> { + persist(new MsgSent(s), evt -> updateState(evt)); + }). + match(Confirm.class, confirm -> { + persist(new MsgConfirmed(confirm.deliveryId), evt -> updateState(evt)); + }). + build(); + } + + @Override + public PartialFunction receiveRecover() { + return ReceiveBuilder. + match(Object.class, evt -> updateState(evt)).build(); + } + + void updateState(Object event) { + if (event instanceof MsgSent) { + final MsgSent evt = (MsgSent) event; + deliver(destination, deliveryId -> new Msg(deliveryId, evt.s)); + } else if (event instanceof MsgConfirmed) { + final MsgConfirmed evt = (MsgConfirmed) event; + confirmDelivery(evt.deliveryId); + } + } + } + + class MyDestination extends AbstractActor { + public MyDestination() { + receive(ReceiveBuilder. + match(Msg.class, msg -> { + // ... + sender().tell(new Confirm(msg.deliveryId), self()); + }).build() + ); + } + } + //#at-least-once-example + }; + + static Object o3 = new Object() { //#channel-example class MyProcessor extends AbstractProcessor {