Merge pull request #1808 from krasserm/wip-3681-performance-improvements-krasserm

!per #3681 Performance and consistency improvements
This commit is contained in:
Patrik Nordwall 2013-10-30 08:17:01 -07:00
commit 6a6525fa1f
29 changed files with 1324 additions and 76 deletions

View file

@ -83,7 +83,7 @@ trait UnrestrictedStash extends Actor {
* The actor's deque-based message queue.
* `mailbox.queue` is the underlying `Deque`.
*/
protected[akka] val mailbox: DequeBasedMessageQueueSemantics = {
private[akka] val mailbox: DequeBasedMessageQueueSemantics = {
context.asInstanceOf[ActorCell].mailbox.messageQueue match {
case queue: DequeBasedMessageQueueSemantics queue
case other throw ActorInitializationException(self, s"DequeBasedMailbox required, got: ${other.getClass.getName}\n" +
@ -136,7 +136,7 @@ trait UnrestrictedStash extends Actor {
* @param filterPredicate only stashed messages selected by this predicate are
* prepended to the mailbox.
*/
protected[akka] def unstashAll(filterPredicate: Any Boolean): Unit = {
private[akka] def unstashAll(filterPredicate: Any Boolean): Unit = {
try {
val i = theStash.reverseIterator.filter(envelope filterPredicate(envelope.message))
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
@ -145,6 +145,17 @@ trait UnrestrictedStash extends Actor {
}
}
/**
* INTERNAL API.
*
* Clears the stash and and returns all envelopes that have not been unstashed.
*/
private[akka] def clearStash(): Vector[Envelope] = {
val stashed = theStash
theStash = Vector.empty[Envelope]
stashed
}
/**
* Overridden callback. Prepends all messages in the stash to the mailbox,
* clears the stash, stops all children and invokes the postStop() callback.

View file

@ -9,6 +9,8 @@ import scala.Option;
import akka.actor.*;
import akka.persistence.*;
import static java.util.Arrays.asList;
public class PersistenceDocTest {
public interface ProcessorMethods {
@ -237,4 +239,31 @@ public class PersistenceDocTest {
}
}
};
static Object o6 = new Object() {
//#batch-write
class MyProcessor extends UntypedProcessor {
public void onReceive(Object message) throws Exception {
if (message instanceof Persistent) {
Persistent p = (Persistent)message;
if (p.payload().equals("a")) { /* ... */ }
if (p.payload().equals("b")) { /* ... */ }
}
}
}
class Example {
final ActorSystem system = ActorSystem.create("example");
final ActorRef processor = system.actorOf(Props.create(MyProcessor.class));
public void batchWrite() {
processor.tell(PersistentBatch.create(asList(
Persistent.create("a"),
Persistent.create("b"))), null);
}
// ...
}
//#batch-write
};
}

View file

@ -45,6 +45,11 @@ public class PersistencePluginDocTest {
return null;
}
@Override
public Future<Void> doWriteBatchAsync(Iterable<PersistentImpl> persistentBatch) {
return null;
}
@Override
public Future<Void> doDeleteAsync(PersistentImpl persistent) {
return null;

View file

@ -265,6 +265,8 @@ If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which s
To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no
saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages.
.. _event-sourcing-java:
Event sourcing
==============
@ -318,6 +320,20 @@ The example also demonstrates how to change the processor's default behavior, de
another behavior, defined by ``otherCommandHandler``, and back using ``getContext().become()`` and
``getContext().unbecome()``. See also the API docs of ``persist`` for further details.
Batch writes
============
Applications may also send a batch of ``Persistent`` messages to a processor via a ``PersistentBatch`` message.
.. includecode:: code/docs/persistence/PersistenceDocTest.java#batch-write
``Persistent`` messages contained in a ``PersistentBatch`` message are written to the journal atomically but are
received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes
can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example,
in :ref:`event-sourcing-java`, all events that are generated and persisted by a single command are batch-written to
the journal. The recovery of an ``UntypedEventsourcedProcessor`` will therefore never be done partially i.e. with
only a subset of events persisted by a single command.
Storage plugins
===============

View file

@ -225,4 +225,22 @@ trait PersistenceDocSpec {
maxTimestamp = System.currentTimeMillis))
//#snapshot-criteria
}
new AnyRef {
import akka.actor.Props
//#batch-write
class MyProcessor extends Processor {
def receive = {
case Persistent("a", _) // ...
case Persistent("b", _) // ...
}
}
val system = ActorSystem("example")
val processor = system.actorOf(Props[MyProcessor])
processor ! PersistentBatch(Vector(Persistent("a"), Persistent("b")))
//#batch-write
system.shutdown()
}
}

View file

@ -6,6 +6,7 @@ package docs.persistence
//#plugin-imports
import scala.concurrent.Future
import scala.collection.immutable.Seq
//#plugin-imports
import com.typesafe.config._
@ -69,6 +70,7 @@ class PersistencePluginDocSpec extends WordSpec {
class MyJournal extends AsyncWriteJournal {
def writeAsync(persistent: PersistentImpl): Future[Unit] = ???
def writeBatchAsync(persistentBatch: Seq[PersistentImpl]): Future[Unit] = ???
def deleteAsync(persistent: PersistentImpl): Future[Unit] = ???
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ???
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) Unit): Future[Long] = ???

View file

@ -331,6 +331,20 @@ The example also demonstrates how to change the processor's default behavior, de
another behavior, defined by ``otherCommandHandler``, and back using ``context.become()`` and ``context.unbecome()``.
See also the API docs of ``persist`` for further details.
Batch writes
============
Applications may also send a batch of ``Persistent`` messages to a processor via a ``PersistentBatch`` message.
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#batch-write
``Persistent`` messages contained in a ``PersistentBatch`` message are written to the journal atomically but are
received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes
can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example,
in :ref:`event-sourcing`, all events that are generated and persisted by a single command are batch-written to the
journal. The recovery of an ``EventsourcedProcessor`` will therefore never be done partially i.e. with only a subset
of events persisted by a single command.
Storage plugins
===============

View file

@ -17,6 +17,14 @@ interface AsyncWritePlugin {
*/
Future<Void> doWriteAsync(PersistentImpl persistent);
/**
* Plugin Java API.
*
* Asynchronously writes a batch of persistent messages to the journal. The batch write
* must be atomic i.e. either all persistent messages in the batch are written or none.
*/
Future<Void> doWriteBatchAsync(Iterable<PersistentImpl> persistentBatch);
/**
* Plugin Java API.
*

View file

@ -15,6 +15,14 @@ interface SyncWritePlugin {
*/
void doWrite(PersistentImpl persistent) throws Exception;
/**
* Plugin Java API.
*
* Synchronously writes a batch of persistent messages to the journal. The batch write
* must be atomic i.e. either all persistent messages in the batch are written or none.
*/
void doWriteBatch(Iterable<PersistentImpl> persistentBatch);
/**
* Plugin Java API.
*

View file

@ -8,6 +8,692 @@ public final class MessageFormats {
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public interface PersistentMessageBatchOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// repeated .PersistentMessage batch = 1;
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
java.util.List<akka.persistence.serialization.MessageFormats.PersistentMessage>
getBatchList();
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
akka.persistence.serialization.MessageFormats.PersistentMessage getBatch(int index);
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
int getBatchCount();
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
java.util.List<? extends akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder>
getBatchOrBuilderList();
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder getBatchOrBuilder(
int index);
}
/**
* Protobuf type {@code PersistentMessageBatch}
*/
public static final class PersistentMessageBatch extends
com.google.protobuf.GeneratedMessage
implements PersistentMessageBatchOrBuilder {
// Use PersistentMessageBatch.newBuilder() to construct.
private PersistentMessageBatch(com.google.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
private PersistentMessageBatch(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
private static final PersistentMessageBatch defaultInstance;
public static PersistentMessageBatch getDefaultInstance() {
return defaultInstance;
}
public PersistentMessageBatch getDefaultInstanceForType() {
return defaultInstance;
}
private final com.google.protobuf.UnknownFieldSet unknownFields;
@java.lang.Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private PersistentMessageBatch(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
initFields();
int mutable_bitField0_ = 0;
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
case 10: {
if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
batch_ = new java.util.ArrayList<akka.persistence.serialization.MessageFormats.PersistentMessage>();
mutable_bitField0_ |= 0x00000001;
}
batch_.add(input.readMessage(akka.persistence.serialization.MessageFormats.PersistentMessage.PARSER, extensionRegistry));
break;
}
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new com.google.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
batch_ = java.util.Collections.unmodifiableList(batch_);
}
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.persistence.serialization.MessageFormats.internal_static_PersistentMessageBatch_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.persistence.serialization.MessageFormats.internal_static_PersistentMessageBatch_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.persistence.serialization.MessageFormats.PersistentMessageBatch.class, akka.persistence.serialization.MessageFormats.PersistentMessageBatch.Builder.class);
}
public static com.google.protobuf.Parser<PersistentMessageBatch> PARSER =
new com.google.protobuf.AbstractParser<PersistentMessageBatch>() {
public PersistentMessageBatch parsePartialFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return new PersistentMessageBatch(input, extensionRegistry);
}
};
@java.lang.Override
public com.google.protobuf.Parser<PersistentMessageBatch> getParserForType() {
return PARSER;
}
// repeated .PersistentMessage batch = 1;
public static final int BATCH_FIELD_NUMBER = 1;
private java.util.List<akka.persistence.serialization.MessageFormats.PersistentMessage> batch_;
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public java.util.List<akka.persistence.serialization.MessageFormats.PersistentMessage> getBatchList() {
return batch_;
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public java.util.List<? extends akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder>
getBatchOrBuilderList() {
return batch_;
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public int getBatchCount() {
return batch_.size();
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public akka.persistence.serialization.MessageFormats.PersistentMessage getBatch(int index) {
return batch_.get(index);
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder getBatchOrBuilder(
int index) {
return batch_.get(index);
}
private void initFields() {
batch_ = java.util.Collections.emptyList();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
for (int i = 0; i < getBatchCount(); i++) {
if (!getBatch(i).isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
for (int i = 0; i < batch_.size(); i++) {
output.writeMessage(1, batch_.get(i));
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
for (int i = 0; i < batch_.size(); i++) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(1, batch_.get(i));
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(akka.persistence.serialization.MessageFormats.PersistentMessageBatch prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code PersistentMessageBatch}
*/
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements akka.persistence.serialization.MessageFormats.PersistentMessageBatchOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.persistence.serialization.MessageFormats.internal_static_PersistentMessageBatch_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.persistence.serialization.MessageFormats.internal_static_PersistentMessageBatch_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.persistence.serialization.MessageFormats.PersistentMessageBatch.class, akka.persistence.serialization.MessageFormats.PersistentMessageBatch.Builder.class);
}
// Construct using akka.persistence.serialization.MessageFormats.PersistentMessageBatch.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getBatchFieldBuilder();
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
if (batchBuilder_ == null) {
batch_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000001);
} else {
batchBuilder_.clear();
}
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return akka.persistence.serialization.MessageFormats.internal_static_PersistentMessageBatch_descriptor;
}
public akka.persistence.serialization.MessageFormats.PersistentMessageBatch getDefaultInstanceForType() {
return akka.persistence.serialization.MessageFormats.PersistentMessageBatch.getDefaultInstance();
}
public akka.persistence.serialization.MessageFormats.PersistentMessageBatch build() {
akka.persistence.serialization.MessageFormats.PersistentMessageBatch result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public akka.persistence.serialization.MessageFormats.PersistentMessageBatch buildPartial() {
akka.persistence.serialization.MessageFormats.PersistentMessageBatch result = new akka.persistence.serialization.MessageFormats.PersistentMessageBatch(this);
int from_bitField0_ = bitField0_;
if (batchBuilder_ == null) {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
batch_ = java.util.Collections.unmodifiableList(batch_);
bitField0_ = (bitField0_ & ~0x00000001);
}
result.batch_ = batch_;
} else {
result.batch_ = batchBuilder_.build();
}
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof akka.persistence.serialization.MessageFormats.PersistentMessageBatch) {
return mergeFrom((akka.persistence.serialization.MessageFormats.PersistentMessageBatch)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.persistence.serialization.MessageFormats.PersistentMessageBatch other) {
if (other == akka.persistence.serialization.MessageFormats.PersistentMessageBatch.getDefaultInstance()) return this;
if (batchBuilder_ == null) {
if (!other.batch_.isEmpty()) {
if (batch_.isEmpty()) {
batch_ = other.batch_;
bitField0_ = (bitField0_ & ~0x00000001);
} else {
ensureBatchIsMutable();
batch_.addAll(other.batch_);
}
onChanged();
}
} else {
if (!other.batch_.isEmpty()) {
if (batchBuilder_.isEmpty()) {
batchBuilder_.dispose();
batchBuilder_ = null;
batch_ = other.batch_;
bitField0_ = (bitField0_ & ~0x00000001);
batchBuilder_ =
com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ?
getBatchFieldBuilder() : null;
} else {
batchBuilder_.addAllMessages(other.batch_);
}
}
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
for (int i = 0; i < getBatchCount(); i++) {
if (!getBatch(i).isInitialized()) {
return false;
}
}
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
akka.persistence.serialization.MessageFormats.PersistentMessageBatch parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (akka.persistence.serialization.MessageFormats.PersistentMessageBatch) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
// repeated .PersistentMessage batch = 1;
private java.util.List<akka.persistence.serialization.MessageFormats.PersistentMessage> batch_ =
java.util.Collections.emptyList();
private void ensureBatchIsMutable() {
if (!((bitField0_ & 0x00000001) == 0x00000001)) {
batch_ = new java.util.ArrayList<akka.persistence.serialization.MessageFormats.PersistentMessage>(batch_);
bitField0_ |= 0x00000001;
}
}
private com.google.protobuf.RepeatedFieldBuilder<
akka.persistence.serialization.MessageFormats.PersistentMessage, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder, akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder> batchBuilder_;
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public java.util.List<akka.persistence.serialization.MessageFormats.PersistentMessage> getBatchList() {
if (batchBuilder_ == null) {
return java.util.Collections.unmodifiableList(batch_);
} else {
return batchBuilder_.getMessageList();
}
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public int getBatchCount() {
if (batchBuilder_ == null) {
return batch_.size();
} else {
return batchBuilder_.getCount();
}
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public akka.persistence.serialization.MessageFormats.PersistentMessage getBatch(int index) {
if (batchBuilder_ == null) {
return batch_.get(index);
} else {
return batchBuilder_.getMessage(index);
}
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public Builder setBatch(
int index, akka.persistence.serialization.MessageFormats.PersistentMessage value) {
if (batchBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
ensureBatchIsMutable();
batch_.set(index, value);
onChanged();
} else {
batchBuilder_.setMessage(index, value);
}
return this;
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public Builder setBatch(
int index, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder builderForValue) {
if (batchBuilder_ == null) {
ensureBatchIsMutable();
batch_.set(index, builderForValue.build());
onChanged();
} else {
batchBuilder_.setMessage(index, builderForValue.build());
}
return this;
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public Builder addBatch(akka.persistence.serialization.MessageFormats.PersistentMessage value) {
if (batchBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
ensureBatchIsMutable();
batch_.add(value);
onChanged();
} else {
batchBuilder_.addMessage(value);
}
return this;
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public Builder addBatch(
int index, akka.persistence.serialization.MessageFormats.PersistentMessage value) {
if (batchBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
ensureBatchIsMutable();
batch_.add(index, value);
onChanged();
} else {
batchBuilder_.addMessage(index, value);
}
return this;
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public Builder addBatch(
akka.persistence.serialization.MessageFormats.PersistentMessage.Builder builderForValue) {
if (batchBuilder_ == null) {
ensureBatchIsMutable();
batch_.add(builderForValue.build());
onChanged();
} else {
batchBuilder_.addMessage(builderForValue.build());
}
return this;
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public Builder addBatch(
int index, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder builderForValue) {
if (batchBuilder_ == null) {
ensureBatchIsMutable();
batch_.add(index, builderForValue.build());
onChanged();
} else {
batchBuilder_.addMessage(index, builderForValue.build());
}
return this;
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public Builder addAllBatch(
java.lang.Iterable<? extends akka.persistence.serialization.MessageFormats.PersistentMessage> values) {
if (batchBuilder_ == null) {
ensureBatchIsMutable();
super.addAll(values, batch_);
onChanged();
} else {
batchBuilder_.addAllMessages(values);
}
return this;
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public Builder clearBatch() {
if (batchBuilder_ == null) {
batch_ = java.util.Collections.emptyList();
bitField0_ = (bitField0_ & ~0x00000001);
onChanged();
} else {
batchBuilder_.clear();
}
return this;
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public Builder removeBatch(int index) {
if (batchBuilder_ == null) {
ensureBatchIsMutable();
batch_.remove(index);
onChanged();
} else {
batchBuilder_.remove(index);
}
return this;
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public akka.persistence.serialization.MessageFormats.PersistentMessage.Builder getBatchBuilder(
int index) {
return getBatchFieldBuilder().getBuilder(index);
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder getBatchOrBuilder(
int index) {
if (batchBuilder_ == null) {
return batch_.get(index); } else {
return batchBuilder_.getMessageOrBuilder(index);
}
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public java.util.List<? extends akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder>
getBatchOrBuilderList() {
if (batchBuilder_ != null) {
return batchBuilder_.getMessageOrBuilderList();
} else {
return java.util.Collections.unmodifiableList(batch_);
}
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public akka.persistence.serialization.MessageFormats.PersistentMessage.Builder addBatchBuilder() {
return getBatchFieldBuilder().addBuilder(
akka.persistence.serialization.MessageFormats.PersistentMessage.getDefaultInstance());
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public akka.persistence.serialization.MessageFormats.PersistentMessage.Builder addBatchBuilder(
int index) {
return getBatchFieldBuilder().addBuilder(
index, akka.persistence.serialization.MessageFormats.PersistentMessage.getDefaultInstance());
}
/**
* <code>repeated .PersistentMessage batch = 1;</code>
*/
public java.util.List<akka.persistence.serialization.MessageFormats.PersistentMessage.Builder>
getBatchBuilderList() {
return getBatchFieldBuilder().getBuilderList();
}
private com.google.protobuf.RepeatedFieldBuilder<
akka.persistence.serialization.MessageFormats.PersistentMessage, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder, akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder>
getBatchFieldBuilder() {
if (batchBuilder_ == null) {
batchBuilder_ = new com.google.protobuf.RepeatedFieldBuilder<
akka.persistence.serialization.MessageFormats.PersistentMessage, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder, akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder>(
batch_,
((bitField0_ & 0x00000001) == 0x00000001),
getParentForChildren(),
isClean());
batch_ = null;
}
return batchBuilder_;
}
// @@protoc_insertion_point(builder_scope:PersistentMessageBatch)
}
static {
defaultInstance = new PersistentMessageBatch(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:PersistentMessageBatch)
}
public interface PersistentMessageOrBuilder
extends com.google.protobuf.MessageOrBuilder {
@ -3059,6 +3745,11 @@ public final class MessageFormats {
// @@protoc_insertion_point(class_scope:ConfirmMessage)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_PersistentMessageBatch_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_PersistentMessageBatch_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_PersistentMessage_descriptor;
private static
@ -3083,38 +3774,46 @@ public final class MessageFormats {
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\024MessageFormats.proto\"\371\001\n\021PersistentMes" +
"sage\022#\n\007payload\030\001 \001(\0132\022.PersistentPayloa" +
"d\022\022\n\nsequenceNr\030\002 \001(\003\022\023\n\013processorId\030\003 \001" +
"(\t\022\021\n\tchannelId\030\004 \001(\t\022\017\n\007deleted\030\005 \001(\010\022\020" +
"\n\010resolved\030\006 \001(\010\022\020\n\010confirms\030\010 \003(\t\022\'\n\016co" +
"nfirmMessage\030\n \001(\0132\017.ConfirmMessage\022\025\n\rc" +
"onfirmTarget\030\t \001(\t\022\016\n\006sender\030\007 \001(\t\"S\n\021Pe" +
"rsistentPayload\022\024\n\014serializerId\030\001 \002(\005\022\017\n" +
"\007payload\030\002 \002(\014\022\027\n\017payloadManifest\030\003 \001(\014\"" +
"L\n\016ConfirmMessage\022\023\n\013processorId\030\001 \001(\t\022\022",
"\n\nsequenceNr\030\002 \001(\003\022\021\n\tchannelId\030\003 \001(\tB\"\n" +
"\036akka.persistence.serializationH\001"
"\n\024MessageFormats.proto\";\n\026PersistentMess" +
"ageBatch\022!\n\005batch\030\001 \003(\0132\022.PersistentMess" +
"age\"\371\001\n\021PersistentMessage\022#\n\007payload\030\001 \001" +
"(\0132\022.PersistentPayload\022\022\n\nsequenceNr\030\002 \001" +
"(\003\022\023\n\013processorId\030\003 \001(\t\022\021\n\tchannelId\030\004 \001" +
"(\t\022\017\n\007deleted\030\005 \001(\010\022\020\n\010resolved\030\006 \001(\010\022\020\n" +
"\010confirms\030\010 \003(\t\022\'\n\016confirmMessage\030\n \001(\0132" +
"\017.ConfirmMessage\022\025\n\rconfirmTarget\030\t \001(\t\022" +
"\016\n\006sender\030\007 \001(\t\"S\n\021PersistentPayload\022\024\n\014" +
"serializerId\030\001 \002(\005\022\017\n\007payload\030\002 \002(\014\022\027\n\017p",
"ayloadManifest\030\003 \001(\014\"L\n\016ConfirmMessage\022\023" +
"\n\013processorId\030\001 \001(\t\022\022\n\nsequenceNr\030\002 \001(\003\022" +
"\021\n\tchannelId\030\003 \001(\tB\"\n\036akka.persistence.s" +
"erializationH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_PersistentMessage_descriptor =
internal_static_PersistentMessageBatch_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_PersistentMessageBatch_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_PersistentMessageBatch_descriptor,
new java.lang.String[] { "Batch", });
internal_static_PersistentMessage_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_PersistentMessage_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_PersistentMessage_descriptor,
new java.lang.String[] { "Payload", "SequenceNr", "ProcessorId", "ChannelId", "Deleted", "Resolved", "Confirms", "ConfirmMessage", "ConfirmTarget", "Sender", });
internal_static_PersistentPayload_descriptor =
getDescriptor().getMessageTypes().get(1);
getDescriptor().getMessageTypes().get(2);
internal_static_PersistentPayload_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_PersistentPayload_descriptor,
new java.lang.String[] { "SerializerId", "Payload", "PayloadManifest", });
internal_static_ConfirmMessage_descriptor =
getDescriptor().getMessageTypes().get(2);
getDescriptor().getMessageTypes().get(3);
internal_static_ConfirmMessage_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ConfirmMessage_descriptor,

View file

@ -5,6 +5,10 @@
option java_package = "akka.persistence.serialization";
option optimize_for = SPEED;
message PersistentMessageBatch {
repeated PersistentMessage batch = 1;
}
message PersistentMessage {
optional PersistentPayload payload = 1;
optional int64 sequenceNr = 2;

View file

@ -18,6 +18,7 @@ akka {
serialization-bindings {
"akka.persistence.serialization.Snapshot" = akka-persistence-snapshot
"akka.persistence.PersistentBatch" = akka-persistence-message
"akka.persistence.PersistentImpl" = akka-persistence-message
"akka.persistence.Confirm" = akka-persistence-message
}

View file

@ -21,20 +21,37 @@ private[persistence] trait Eventsourced extends Processor {
def aroundReceive(receive: Receive, message: Any): Unit
}
/**
* Processor recovery state. Waits for recovery completion and then changes to
* `processingCommands`
*/
private val recovering: State = new State {
def aroundReceive(receive: Receive, message: Any) {
Eventsourced.super.aroundReceive(receive, message)
message match {
case _: ReplaySuccess | _: ReplayFailure currentState = processingCommands
case _
}
}
}
/**
* Command processing state. If event persistence is pending after processing a
* command, event persistence is triggered and state changes to `persistingEvents`.
*
* There's no need to loop commands though the journal any more i.e. they can now be
* directly offered as `LoopSuccess` to the state machine implemented by `Processor`.
*/
private val processingCommands: State = new State {
def aroundReceive(receive: Receive, message: Any) = message match {
case m if (persistInvocations.isEmpty) {
Eventsourced.super.aroundReceive(receive, m)
def aroundReceive(receive: Receive, message: Any) {
Eventsourced.super.aroundReceive(receive, LoopSuccess(message))
if (!persistInvocations.isEmpty) {
persistInvocations = persistInvocations.reverse
persistCandidates = persistCandidates.reverse
persistCandidates.foreach(self forward Persistent(_))
currentState = persistingEvents
}
Eventsourced.super.aroundReceive(receive, PersistentBatch(persistentEventBatch.reverse))
persistInvocations = persistInvocations.reverse
persistentEventBatch = Nil
} else {
processorStash.unstash()
}
}
}
@ -46,9 +63,13 @@ private[persistence] trait Eventsourced extends Processor {
*/
private val persistingEvents: State = new State {
def aroundReceive(receive: Receive, message: Any) = message match {
case p: PersistentImpl if identical(p.payload, persistCandidates.head) {
Eventsourced.super.aroundReceive(receive, message)
persistCandidates = persistCandidates.tail
case PersistentBatch(b) {
b.foreach(deleteMessage)
throw new UnsupportedOperationException("Persistent command batches not supported")
}
case p: PersistentImpl {
deleteMessage(p)
throw new UnsupportedOperationException("Persistent commands not supported")
}
case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) {
withCurrentPersistent(p)(p persistInvocations.head._2(p.payload))
@ -65,7 +86,7 @@ private[persistence] trait Eventsourced extends Processor {
persistInvocations = persistInvocations.tail
if (persistInvocations.isEmpty) {
currentState = processingCommands
processorStash.unstashAll()
processorStash.unstash()
}
}
@ -74,9 +95,9 @@ private[persistence] trait Eventsourced extends Processor {
}
private var persistInvocations: List[(Any, Any Unit)] = Nil
private var persistCandidates: List[Any] = Nil
private var persistentEventBatch: List[PersistentImpl] = Nil
private var currentState: State = processingCommands
private var currentState: State = recovering
private val processorStash = createProcessorStash
/**
@ -103,7 +124,7 @@ private[persistence] trait Eventsourced extends Processor {
*/
final def persist[A](event: A)(handler: A Unit): Unit = {
persistInvocations = (event, handler.asInstanceOf[Any Unit]) :: persistInvocations
persistCandidates = event :: persistCandidates
persistentEventBatch = PersistentImpl(event) :: persistentEventBatch
}
/**
@ -137,6 +158,13 @@ private[persistence] trait Eventsourced extends Processor {
*/
def receiveCommand: Receive
override def unstashAll() {
// Internally, all messages are processed by unstashing them from
// the internal stash one-by-one. Hence, an unstashAll() from the
// user stash must be prepended to the internal stash.
processorStash.prepend(clearStash())
}
/**
* INTERNAL API.
*/
@ -144,6 +172,22 @@ private[persistence] trait Eventsourced extends Processor {
currentState.aroundReceive(receive, message)
}
/**
* Calls `super.preRestart` then unstashes all messages from the internal stash.
*/
override def preRestart(reason: Throwable, message: Option[Any]) {
super.preRestart(reason, message)
processorStash.unstashAll()
}
/**
* Calls `super.postStop` then unstashes all messages from the internal stash.
*/
override def postStop() {
super.postStop()
processorStash.unstashAll()
}
/**
* INTERNAL API.
*/
@ -242,7 +286,9 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events
* Command handler. Typically validates commands against current state (and/or by
* communication with other actors). On successful validation, one or more events are
* derived from a command and these events are then persisted by calling `persist`.
* Commands sent to event sourced processors should not be [[Persistent]] messages.
* Commands sent to event sourced processors must not be [[Persistent]] or
* [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is
* thrown by the processor.
*/
def onReceiveCommand(msg: Any): Unit
}

View file

@ -4,6 +4,8 @@
package akka.persistence
import scala.collection.immutable
import akka.actor._
/**
@ -18,6 +20,14 @@ private[persistence] object JournalProtocol {
*/
case class Delete(persistent: Persistent)
/**
* Instructs a journal to persist a sequence of messages.
*
* @param persistentBatch batch of messages to be persisted.
* @param processor requesting processor.
*/
case class WriteBatch(persistentBatch: immutable.Seq[PersistentImpl], processor: ActorRef)
/**
* Instructs a journal to persist a message.
*

View file

@ -4,9 +4,13 @@
package akka.persistence
import java.lang.{ Iterable JIterable }
import java.util.{ List JList }
import scala.collection.immutable
import akka.actor.ActorRef
import akka.japi.Util.immutableSeq
/**
* Persistent message.
@ -78,6 +82,27 @@ object Persistent {
Some((persistent.payload, persistent.sequenceNr))
}
/**
* Instructs a [[Processor]] to atomically write the contained [[Persistent]] messages to the
* journal. The processor receives the written messages individually as [[Persistent]] messages.
* During recovery, they are also replayed individually.
*/
case class PersistentBatch(persistentBatch: immutable.Seq[Persistent]) {
/**
* INTERNAL API.
*/
private[persistence] def persistentImplList: List[PersistentImpl] =
persistentBatch.toList.asInstanceOf[List[PersistentImpl]]
}
object PersistentBatch {
/**
* JAVA API.
*/
def create(persistentBatch: JIterable[Persistent]) =
PersistentBatch(immutableSeq(persistentBatch))
}
/**
* Plugin API.
*
@ -123,6 +148,9 @@ case class PersistentImpl(
* Java Plugin API.
*/
def getConfirms: JList[String] = confirms.asJava
private[persistence] def prepareWrite(sender: ActorRef) =
copy(sender = sender, resolved = false, confirmTarget = null, confirmMessage = null)
}
object PersistentImpl {

View file

@ -4,6 +4,8 @@
package akka.persistence
import scala.collection.immutable
import akka.actor._
import akka.dispatch._
@ -49,6 +51,7 @@ import akka.dispatch._
*
* @see [[UntypedProcessor]]
* @see [[Recover]]
* @see [[PersistentBatch]]
*/
trait Processor extends Actor with Stash {
import JournalProtocol._
@ -153,6 +156,7 @@ trait Processor extends Actor with Stash {
}
case LoopSuccess(m) process(receive, m)
case p: PersistentImpl journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self)
case pb: PersistentBatch journal forward WriteBatch(pb.persistentImplList.map(_.copy(processorId = processorId, sequenceNr = nextSequenceNr())), self)
case m journal forward Loop(m, self)
}
}
@ -362,6 +366,16 @@ trait Processor extends Actor with Stash {
def stash(): Unit =
theStash :+= currentEnvelope
def prepend(others: immutable.Seq[Envelope]): Unit =
others.reverseIterator.foreach(env theStash = env +: theStash)
def unstash(): Unit = try {
if (theStash.nonEmpty) {
mailbox.enqueueFirst(self, theStash.head)
theStash = theStash.tail
}
}
def unstashAll(): Unit = try {
val i = theStash.reverseIterator
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
@ -377,7 +391,24 @@ trait Processor extends Actor with Stash {
* Processor specific stash used internally to avoid interference with user stash.
*/
private[persistence] trait ProcessorStash {
/**
* Appends the current message to this stash.
*/
def stash()
/**
* Prepends `others` to this stash.
*/
def prepend(others: immutable.Seq[Envelope])
/**
* Unstashes a single message from this stash.
*/
def unstash()
/**
* Unstashes all messages from this stash.
*/
def unstashAll()
}
@ -434,6 +465,7 @@ private[persistence] trait ProcessorStash {
*
* @see [[Processor]]
* @see [[Recover]]
* @see [[PersistentBatch]]
*/
abstract class UntypedProcessor extends UntypedActor with Processor {

View file

@ -4,6 +4,7 @@
package akka.persistence.journal
import scala.collection.immutable
import scala.concurrent.Future
import scala.util._
@ -11,7 +12,6 @@ import akka.actor._
import akka.pattern.{ pipe, PromiseActorRef }
import akka.persistence._
import akka.persistence.JournalProtocol._
import akka.serialization.Serialization
/**
* Abstract journal, optimized for asynchronous, non-blocking writes.
@ -28,13 +28,26 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
val csdr = sender
val cctr = resequencerCounter
val psdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
writeAsync(persistent.copy(sender = psdr, resolved = false, confirmTarget = null, confirmMessage = null)) map {
writeAsync(persistent.prepareWrite(psdr)) map {
_ Desequenced(WriteSuccess(persistent), cctr, processor, csdr)
} recover {
case e Desequenced(WriteFailure(persistent, e), cctr, processor, csdr)
} pipeTo (resequencer)
resequencerCounter += 1
}
case WriteBatch(persistentBatch, processor) {
val csdr = sender
val cctr = resequencerCounter
val psdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
def resequence(f: PersistentImpl Any) = persistentBatch.zipWithIndex.foreach {
case (p, i) resequencer ! Desequenced(f(p), cctr + i, processor, csdr)
}
writeBatchAsync(persistentBatch.map(_.prepareWrite(psdr))) onComplete {
case Success(_) resequence(WriteSuccess(_))
case Failure(e) resequence(WriteFailure(_, e))
}
resequencerCounter += persistentBatch.length
}
case Replay(fromSequenceNr, toSequenceNr, processorId, processor) {
// Send replayed messages and replay result to processor directly. No need
// to resequence replayed messages relative to written and looped messages.
@ -73,6 +86,14 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
*/
def writeAsync(persistent: PersistentImpl): Future[Unit]
/**
* Plugin API.
*
* Asynchronously writes a batch of persistent messages to the journal. The batch write
* must be atomic i.e. either all persistent messages in the batch are written or none.
*/
def writeBatchAsync(persistentBatch: immutable.Seq[PersistentImpl]): Future[Unit]
/**
* Plugin API.
*

View file

@ -4,12 +4,12 @@
package akka.persistence.journal
import scala.collection.immutable
import scala.util._
import akka.actor.Actor
import akka.pattern.{ pipe, PromiseActorRef }
import akka.persistence._
import akka.serialization.Serialization
/**
* Abstract journal, optimized for synchronous writes.
@ -23,11 +23,18 @@ trait SyncWriteJournal extends Actor with AsyncReplay {
final def receive = {
case Write(persistent, processor) {
val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
Try(write(persistent.copy(sender = sdr, resolved = false, confirmTarget = null, confirmMessage = null))) match {
Try(write(persistent.prepareWrite(sdr))) match {
case Success(_) processor forward WriteSuccess(persistent)
case Failure(e) processor forward WriteFailure(persistent, e); throw e
}
}
case WriteBatch(persistentBatch, processor) {
val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
Try(writeBatch(persistentBatch.map(_.prepareWrite(sdr)))) match {
case Success(_) persistentBatch.foreach(processor forward WriteSuccess(_))
case Failure(e) persistentBatch.foreach(processor forward WriteFailure(_, e)); throw e
}
}
case Replay(fromSequenceNr, toSequenceNr, processorId, processor) {
replayAsync(processorId, fromSequenceNr, toSequenceNr) { p
if (!p.deleted) processor.tell(Replayed(p), p.sender)
@ -57,6 +64,14 @@ trait SyncWriteJournal extends Actor with AsyncReplay {
*/
def write(persistent: PersistentImpl): Unit
/**
* Plugin API.
*
* Synchronously writes a batch of persistent messages to the journal. The batch write
* must be atomic i.e. either all persistent messages in the batch are written or none.
*/
def writeBatch(persistentBatch: immutable.Seq[PersistentImpl]): Unit
/**
* Plugin API.
*

View file

@ -4,6 +4,7 @@
package akka.persistence.journal.inmem
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
@ -29,6 +30,9 @@ private[persistence] class InmemJournal extends AsyncWriteJournal {
def writeAsync(persistent: PersistentImpl): Future[Unit] =
(store ? Write(persistent)).mapTo[Unit]
def writeBatchAsync(persistentBatch: immutable.Seq[PersistentImpl]): Future[Unit] =
(store ? WriteBatch(persistentBatch)).mapTo[Unit]
def deleteAsync(persistent: PersistentImpl): Future[Unit] =
(store ? Delete(persistent)).mapTo[Unit]
@ -47,6 +51,7 @@ private[persistence] class InmemStore extends Actor {
def receive = {
case Write(p) add(p); success()
case WriteBatch(pb) pb.foreach(add); success()
case Delete(p) update(p.processorId, p.sequenceNr)(_.copy(deleted = true)); success()
case Confirm(pid, snr, cid) update(pid, snr)(p p.copy(confirms = cid +: p.confirms)); success()
case Replay(pid, fromSnr, toSnr, callback) {
@ -84,6 +89,7 @@ private[persistence] class InmemStore extends Actor {
private[persistence] object InmemStore {
case class Write(p: PersistentImpl)
case class WriteBatch(pb: Seq[PersistentImpl])
case class Delete(p: PersistentImpl)
case class Confirm(processorId: String, sequenceNr: Long, channelId: String)
case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentImpl) Unit)

View file

@ -4,7 +4,8 @@
package akka.persistence.journal.japi
import scala.concurrent.Future
import scala.collection.immutable
import scala.collection.JavaConverters._
import akka.persistence.journal.{ AsyncWriteJournal SAsyncWriteJournal }
import akka.persistence.PersistentImpl
@ -20,6 +21,9 @@ abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal wit
final def writeAsync(persistent: PersistentImpl) =
doWriteAsync(persistent).map(Unit.unbox)
final def writeBatchAsync(persistentBatch: immutable.Seq[PersistentImpl]) =
doWriteBatchAsync(persistentBatch.asJava).map(Unit.unbox)
final def deleteAsync(persistent: PersistentImpl) =
doDeleteAsync(persistent).map(Unit.unbox)

View file

@ -4,6 +4,9 @@
package akka.persistence.journal.japi
import scala.collection.immutable
import scala.collection.JavaConverters._
import akka.persistence.journal.{ SyncWriteJournal SSyncWriteJournal }
import akka.persistence.PersistentImpl
@ -16,6 +19,9 @@ abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal with
final def write(persistent: PersistentImpl) =
doWrite(persistent)
final def writeBatch(persistentBatch: immutable.Seq[PersistentImpl]) =
doWriteBatch(persistentBatch.asJava)
final def delete(persistent: PersistentImpl) =
doDelete(persistent)

View file

@ -6,6 +6,8 @@ package akka.persistence.journal.leveldb
import java.io.File
import scala.collection.immutable
import org.iq80.leveldb._
import akka.persistence._
@ -36,11 +38,11 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap
import Key._
def write(persistent: PersistentImpl) = withBatch { batch
val nid = numericId(persistent.processorId)
batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr))
batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent))
}
def write(persistent: PersistentImpl) =
withBatch(batch addToBatch(persistent, batch))
def writeBatch(persistentBatch: immutable.Seq[PersistentImpl]) =
withBatch(batch persistentBatch.foreach(persistent addToBatch(persistent, batch)))
def delete(persistent: PersistentImpl) {
leveldb.put(keyToBytes(deletionKey(numericId(persistent.processorId), persistent.sequenceNr)), Array.empty[Byte])
@ -56,6 +58,12 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap
def persistentToBytes(p: PersistentImpl): Array[Byte] = serialization.serialize(p).get
def persistentFromBytes(a: Array[Byte]): PersistentImpl = serialization.deserialize(a, classOf[PersistentImpl]).get
private def addToBatch(persistent: PersistentImpl, batch: WriteBatch): Unit = {
val nid = numericId(persistent.processorId)
batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr))
batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent))
}
private def withBatch[R](body: WriteBatch R): R = {
val batch = leveldb.createWriteBatch()
try {

View file

@ -9,6 +9,7 @@ import scala.language.existentials
import com.google.protobuf._
import akka.actor.ExtendedActorSystem
import akka.japi.Util.immutableSeq
import akka.persistence._
import akka.persistence.serialization.MessageFormats._
import akka.serialization._
@ -19,6 +20,7 @@ import akka.serialization._
class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
import PersistentImpl.Undefined
val PersistentBatchClass = classOf[PersistentBatch]
val PersistentClass = classOf[PersistentImpl]
val ConfirmClass = classOf[Confirm]
@ -26,22 +28,24 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
def includeManifest: Boolean = true
/**
* Serializes a [[Persistent]] message. Delegates serialization of the persistent message's
* payload to a matching `akka.serialization.Serializer`.
* Serializes [[PersistentBatch]] and [[Persistent]]. Delegates serialization of a
* persistent message's payload to a matching `akka.serialization.Serializer`.
*/
def toBinary(o: AnyRef): Array[Byte] = o match {
case b: PersistentBatch persistentMessageBatchBuilder(b).build().toByteArray
case p: PersistentImpl persistentMessageBuilder(p).build().toByteArray
case c: Confirm confirmMessageBuilder(c).build().toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
/**
* Deserializes a [[Persistent]] message. Delegates deserialization of the persistent message's
* payload to a matching `akka.serialization.Serializer`.
* Deserializes [[PersistentBatch]] and [[Persistent]]. Delegates deserialization of a
* persistent message's payload to a matching `akka.serialization.Serializer`.
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
case None persistent(PersistentMessage.parseFrom(bytes))
case Some(c) c match {
case PersistentBatchClass persistentBatch(PersistentMessageBatch.parseFrom(bytes))
case PersistentClass persistent(PersistentMessage.parseFrom(bytes))
case ConfirmClass confirm(ConfirmMessage.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
@ -52,6 +56,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
// toBinary helpers
//
private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = {
val builder = PersistentMessageBatch.newBuilder
persistentBatch.persistentImplList.foreach(p builder.addBatch(persistentMessageBuilder(p)))
builder
}
private def persistentMessageBuilder(persistent: PersistentImpl) = {
val builder = PersistentMessage.newBuilder
@ -92,8 +102,10 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
// fromBinary helpers
//
private def persistentBatch(persistentMessageBatch: PersistentMessageBatch): PersistentBatch =
PersistentBatch(immutableSeq(persistentMessageBatch.getBatchList).map(persistent))
private def persistent(persistentMessage: PersistentMessage): PersistentImpl = {
import scala.collection.JavaConverters._
PersistentImpl(
payload(persistentMessage.getPayload),
persistentMessage.getSequenceNr,
@ -101,7 +113,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
if (persistentMessage.hasChannelId) persistentMessage.getChannelId else Undefined,
persistentMessage.getDeleted,
persistentMessage.getResolved,
persistentMessage.getConfirmsList.asScala.toList,
immutableSeq(persistentMessage.getConfirmsList),
if (persistentMessage.hasConfirmMessage) confirm(persistentMessage.getConfirmMessage) else null,
if (persistentMessage.hasConfirmTarget) system.provider.resolveActorRef(persistentMessage.getConfirmTarget) else null,
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else null)

View file

@ -23,7 +23,7 @@ object EventsourcedSpec {
}
val commonBehavior: Receive = {
case "boom" throw new Exception("boom")
case "boom" throw new TestException("boom")
case GetState sender ! events.reverse
}
@ -164,6 +164,51 @@ object EventsourcedSpec {
}
}
class UserStashManyProcessor(name: String) extends ExampleProcessor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd("a") persist(Evt("a")) { evt
updateState(evt)
context.become(processC)
}
case Cmd("b-1") persist(Evt("b-1"))(updateState)
case Cmd("b-2") persist(Evt("b-2"))(updateState)
}
val processC: Receive = {
case Cmd("c") {
persist(Evt("c")) { evt
updateState(evt)
context.unbecome()
}
unstashAll()
}
case other stash()
}
}
class UserStashFailureProcessor(name: String) extends ExampleProcessor(name) {
val receiveCommand: Receive = commonBehavior orElse {
case Cmd(data) {
if (data == "b-2") throw new TestException("boom")
persist(Evt(data)) { event
updateState(event)
if (data == "a") context.become(otherCommandHandler)
}
}
}
val otherCommandHandler: Receive = {
case Cmd("c") {
persist(Evt("c")) { event
updateState(event)
context.unbecome()
}
unstashAll()
}
case other stash()
}
}
class AnyValEventProcessor(name: String) extends ExampleProcessor(name) {
val receiveCommand: Receive = {
case Cmd("a") persist(5)(evt sender ! evt)
@ -272,7 +317,7 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe
processor ! Cmd("a")
expectMsg("a")
}
"not interfere with the user stash" in {
"support user stash operations" in {
val processor = namedProcessor[UserStashProcessor]
processor ! Cmd("a")
processor ! Cmd("b")
@ -281,6 +326,25 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe
expectMsg("c")
expectMsg("a")
}
"support user stash operations with several stashed messages" in {
val processor = namedProcessor[UserStashManyProcessor]
val n = 10
val cmds = 1 to n flatMap (_ List(Cmd("a"), Cmd("b-1"), Cmd("b-2"), Cmd("c")))
val evts = 1 to n flatMap (_ List("a", "c", "b-1", "b-2"))
cmds foreach (processor ! _)
processor ! GetState
expectMsg((List("a-1", "a-2") ++ evts))
}
"support user stash operations under failures" in {
val processor = namedProcessor[UserStashFailureProcessor]
val bs = 1 to 10 map ("b-" + _)
processor ! Cmd("a")
bs foreach (processor ! Cmd(_))
processor ! Cmd("c")
processor ! GetState
expectMsg(List("a-1", "a-2", "a", "c") ++ bs.filter(_ != "b-2"))
}
"be able to persist events that extend AnyVal" in {
val processor = namedProcessor[AnyValEventProcessor]
processor ! Cmd("a")

View file

@ -0,0 +1,163 @@
package akka.persistence
import scala.concurrent.duration._
import scala.language.postfixOps
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.testkit._
object PerformanceSpec {
// multiply cycles with 100 for more
// accurate throughput measurements
val config =
"""
akka.persistence.performance.cycles.warmup = 300
akka.persistence.performance.cycles.load = 1000
"""
case object StartMeasure
case object StopMeasure
case class FailAt(sequenceNr: Long)
abstract class PerformanceTestProcessor(name: String) extends NamedProcessor(name) {
val NanoToSecond = 1000.0 * 1000 * 1000
var startTime: Long = 0L
var stopTime: Long = 0L
var startSequenceNr = 0L;
var stopSequenceNr = 0L;
var failAt: Long = -1
val controlBehavior: Receive = {
case StartMeasure {
startSequenceNr = lastSequenceNr
startTime = System.nanoTime
}
case StopMeasure {
stopSequenceNr = lastSequenceNr
stopTime = System.nanoTime
sender ! (NanoToSecond * (stopSequenceNr - startSequenceNr) / (stopTime - startTime))
}
case FailAt(sequenceNr) failAt = sequenceNr
}
override def postRestart(reason: Throwable) {
super.postRestart(reason)
receive(StartMeasure)
}
}
class CommandsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) {
def receive = controlBehavior orElse {
case p: Persistent {
if (lastSequenceNr % 1000 == 0) if (recoveryRunning) print("r") else print(".")
if (lastSequenceNr == failAt) throw new TestException("boom")
}
}
}
class EventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with EventsourcedProcessor {
val receiveReplay: Receive = {
case _ if (lastSequenceNr % 1000 == 0) print("r")
}
val receiveCommand: Receive = controlBehavior orElse {
case cmd persist(cmd) { _
if (lastSequenceNr % 1000 == 0) print(".")
if (lastSequenceNr == failAt) throw new TestException("boom")
}
}
}
class StashingEventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with EventsourcedProcessor {
val receiveReplay: Receive = {
case _ if (lastSequenceNr % 1000 == 0) print("r")
}
val printProgress: PartialFunction[Any, Any] = {
case m if (lastSequenceNr % 1000 == 0) print("."); m
}
val receiveCommand: Receive = printProgress andThen (controlBehavior orElse {
case "a" persist("a")(_ context.become(processC))
case "b" persist("b")(_ ())
})
val processC: Receive = printProgress andThen {
case "c" {
persist("c")(_ context.unbecome())
unstashAll()
}
case other stash()
}
}
}
class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "performance").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with PersistenceSpec with ImplicitSender {
import PerformanceSpec._
val warmupClycles = system.settings.config.getInt("akka.persistence.performance.cycles.warmup")
val loadCycles = system.settings.config.getInt("akka.persistence.performance.cycles.load")
def stressCommandsourcedProcessor(failAt: Option[Long]): Unit = {
val processor = namedProcessor[CommandsourcedTestProcessor]
failAt foreach { processor ! FailAt(_) }
1 to warmupClycles foreach { i processor ! Persistent(s"msg${i}") }
processor ! StartMeasure
1 to loadCycles foreach { i processor ! Persistent(s"msg${i}") }
processor ! StopMeasure
expectMsgPF(100 seconds) {
case throughput: Double println(f"\nthroughput = $throughput%.2f persistent commands per second")
}
}
def stressEventsourcedProcessor(failAt: Option[Long]): Unit = {
val processor = namedProcessor[EventsourcedTestProcessor]
failAt foreach { processor ! FailAt(_) }
1 to warmupClycles foreach { i processor ! s"msg${i}" }
processor ! StartMeasure
1 to loadCycles foreach { i processor ! s"msg${i}" }
processor ! StopMeasure
expectMsgPF(100 seconds) {
case throughput: Double println(f"\nthroughput = $throughput%.2f persistent events per second")
}
}
def stressStashingEventsourcedProcessor(): Unit = {
val processor = namedProcessor[StashingEventsourcedTestProcessor]
1 to warmupClycles foreach { i processor ! "b" }
processor ! StartMeasure
val cmds = 1 to (loadCycles / 3) flatMap (_ List("a", "b", "c"))
processor ! StartMeasure
cmds foreach (processor ! _)
processor ! StopMeasure
expectMsgPF(100 seconds) {
case throughput: Double println(f"\nthroughput = $throughput%.2f persistent events per second")
}
}
"A command sourced processor" should {
"have some reasonable throughput" in {
stressCommandsourcedProcessor(None)
}
"have some reasonable throughput under failure conditions" in {
stressCommandsourcedProcessor(Some(warmupClycles + loadCycles / 10))
}
}
"An event sourced processor" should {
"have some reasonable throughput" in {
stressEventsourcedProcessor(None)
}
"have some reasonable throughput under failure conditions" in {
stressEventsourcedProcessor(Some(warmupClycles + loadCycles / 10))
}
"have some reasonable throughput with stashing and unstashing every 3rd command" in {
stressStashingEventsourcedProcessor()
}
}
}

View file

@ -8,6 +8,7 @@ import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import scala.reflect.ClassTag
import scala.util.control.NoStackTrace
import com.typesafe.config.ConfigFactory
@ -53,12 +54,12 @@ trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒
object PersistenceSpec {
def config(plugin: String, test: String) = ConfigFactory.parseString(
s"""
|serialize-creators = on
|serialize-messages = on
|akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}"
|akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec"
|akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}-spec/"
""".stripMargin)
serialize-creators = on
serialize-messages = on
akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}"
akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec"
akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}-spec/"
""")
}
abstract class NamedProcessor(name: String) extends Processor {
@ -69,4 +70,6 @@ trait TurnOffRecoverOnStart { this: Processor ⇒
override def preStart(): Unit = ()
}
class TestException(msg: String) extends Exception(msg) with NoStackTrace
case object GetState

View file

@ -4,6 +4,8 @@
package akka.persistence
import scala.collection.immutable.Seq
import com.typesafe.config._
import akka.actor._
@ -13,8 +15,8 @@ object ProcessorSpec {
class RecoverTestProcessor(name: String) extends NamedProcessor(name) {
var state = List.empty[String]
def receive = {
case "boom" throw new Exception("boom")
case Persistent("boom", _) throw new Exception("boom")
case "boom" throw new TestException("boom")
case Persistent("boom", _) throw new TestException("boom")
case Persistent(payload, snr) state = s"${payload}-${snr}" :: state
case GetState sender ! state.reverse
}
@ -83,7 +85,7 @@ object ProcessorSpec {
}
}
class ResumeTestException extends Exception("test")
class ResumeTestException extends TestException("test")
class ResumeTestSupervisor(name: String) extends Actor {
val processor = context.actorOf(Props(classOf[ResumeTestProcessor], name))
@ -119,7 +121,7 @@ object ProcessorSpec {
class AnyReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) {
val failOnReplayedA: Actor.Receive = {
case Persistent("a", _) if recoveryRunning throw new Exception("boom")
case Persistent("a", _) if recoveryRunning throw new TestException("boom")
}
override def receive = failOnReplayedA orElse super.receive
@ -283,6 +285,13 @@ abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with Persi
processor ! GetState
expectMsg(List("b-1", "c-3", "d-4", "e-5", "f-6", "g-7", "h-8", "i-9", "j-10"))
}
"support batch writes" in {
val processor = namedProcessor[RecoverTestProcessor]
processor ! PersistentBatch(Seq(Persistent("c"), Persistent("d"), Persistent("e")))
processor ! Persistent("f")
processor ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5", "f-6"))
}
}
"A processor" can {

View file

@ -18,8 +18,8 @@ object ProcessorStashSpec {
case Persistent("b", snr) update("b", snr)
case Persistent("c", snr) update("c", snr); unstashAll()
case "x" update("x")
case "boom" throw new Exception("boom")
case Persistent("boom", _) throw new Exception("boom")
case "boom" throw new TestException("boom")
case Persistent("boom", _) throw new TestException("boom")
case GetState sender ! state.reverse
}

View file

@ -1,5 +1,7 @@
package akka.persistence.serialization
import scala.collection.immutable
import com.typesafe.config._
import akka.actor._
@ -118,6 +120,7 @@ object MessageSerializerRemotingSpec {
class RemoteActor extends Actor {
def receive = {
case PersistentBatch(Persistent(MyPayload(data), _) +: tail) sender ! data
case Persistent(MyPayload(data), _) sender ! data
case Confirm(pid, snr, cid) sender ! s"${pid},${snr},${cid}"
}
@ -147,10 +150,13 @@ class MessageSerializerRemotingSpec extends AkkaSpec(config(systemA).withFallbac
localActor ! Persistent(MyPayload("a"))
expectMsg(".a.")
}
"custom-serialize persistent message batches during remoting" in {
localActor ! PersistentBatch(immutable.Seq(Persistent(MyPayload("a"))))
expectMsg(".a.")
}
"serialize confirmation messages during remoting" in {
localActor ! Confirm("a", 2, "b")
expectMsg("a,2,b")
}
}
}