diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index 263c5218ad..60bec667a2 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -83,7 +83,7 @@ trait UnrestrictedStash extends Actor { * The actor's deque-based message queue. * `mailbox.queue` is the underlying `Deque`. */ - protected[akka] val mailbox: DequeBasedMessageQueueSemantics = { + private[akka] val mailbox: DequeBasedMessageQueueSemantics = { context.asInstanceOf[ActorCell].mailbox.messageQueue match { case queue: DequeBasedMessageQueueSemantics ⇒ queue case other ⇒ throw ActorInitializationException(self, s"DequeBasedMailbox required, got: ${other.getClass.getName}\n" + @@ -136,7 +136,7 @@ trait UnrestrictedStash extends Actor { * @param filterPredicate only stashed messages selected by this predicate are * prepended to the mailbox. */ - protected[akka] def unstashAll(filterPredicate: Any ⇒ Boolean): Unit = { + private[akka] def unstashAll(filterPredicate: Any ⇒ Boolean): Unit = { try { val i = theStash.reverseIterator.filter(envelope ⇒ filterPredicate(envelope.message)) while (i.hasNext) mailbox.enqueueFirst(self, i.next()) @@ -145,6 +145,17 @@ trait UnrestrictedStash extends Actor { } } + /** + * INTERNAL API. + * + * Clears the stash and and returns all envelopes that have not been unstashed. + */ + private[akka] def clearStash(): Vector[Envelope] = { + val stashed = theStash + theStash = Vector.empty[Envelope] + stashed + } + /** * Overridden callback. Prepends all messages in the stash to the mailbox, * clears the stash, stops all children and invokes the postStop() callback. diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java index 955c067f51..59b8e7e159 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceDocTest.java @@ -9,6 +9,8 @@ import scala.Option; import akka.actor.*; import akka.persistence.*; +import static java.util.Arrays.asList; + public class PersistenceDocTest { public interface ProcessorMethods { @@ -237,4 +239,31 @@ public class PersistenceDocTest { } } }; + + static Object o6 = new Object() { + //#batch-write + class MyProcessor extends UntypedProcessor { + public void onReceive(Object message) throws Exception { + if (message instanceof Persistent) { + Persistent p = (Persistent)message; + if (p.payload().equals("a")) { /* ... */ } + if (p.payload().equals("b")) { /* ... */ } + } + } + } + + class Example { + final ActorSystem system = ActorSystem.create("example"); + final ActorRef processor = system.actorOf(Props.create(MyProcessor.class)); + + public void batchWrite() { + processor.tell(PersistentBatch.create(asList( + Persistent.create("a"), + Persistent.create("b"))), null); + } + + // ... + } + //#batch-write + }; } diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index 2b32a78746..78caaeb8fe 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -45,6 +45,11 @@ public class PersistencePluginDocTest { return null; } + @Override + public Future doWriteBatchAsync(Iterable persistentBatch) { + return null; + } + @Override public Future doDeleteAsync(PersistentImpl persistent) { return null; diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 17e929f1f7..01d1ca0ac6 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -265,6 +265,8 @@ If not specified, they default to ``SnapshotSelectionCriteria.latest()`` which s To disable snapshot-based recovery, applications should use ``SnapshotSelectionCriteria.none()``. A recovery where no saved snapshot matches the specified ``SnapshotSelectionCriteria`` will replay all journaled messages. +.. _event-sourcing-java: + Event sourcing ============== @@ -318,6 +320,20 @@ The example also demonstrates how to change the processor's default behavior, de another behavior, defined by ``otherCommandHandler``, and back using ``getContext().become()`` and ``getContext().unbecome()``. See also the API docs of ``persist`` for further details. +Batch writes +============ + +Applications may also send a batch of ``Persistent`` messages to a processor via a ``PersistentBatch`` message. + +.. includecode:: code/docs/persistence/PersistenceDocTest.java#batch-write + +``Persistent`` messages contained in a ``PersistentBatch`` message are written to the journal atomically but are +received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes +can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example, +in :ref:`event-sourcing-java`, all events that are generated and persisted by a single command are batch-written to +the journal. The recovery of an ``UntypedEventsourcedProcessor`` will therefore never be done partially i.e. with +only a subset of events persisted by a single command. + Storage plugins =============== diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index a54ccdaac6..a732b1025c 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -225,4 +225,22 @@ trait PersistenceDocSpec { maxTimestamp = System.currentTimeMillis)) //#snapshot-criteria } + + new AnyRef { + import akka.actor.Props + //#batch-write + class MyProcessor extends Processor { + def receive = { + case Persistent("a", _) ⇒ // ... + case Persistent("b", _) ⇒ // ... + } + } + + val system = ActorSystem("example") + val processor = system.actorOf(Props[MyProcessor]) + + processor ! PersistentBatch(Vector(Persistent("a"), Persistent("b"))) + //#batch-write + system.shutdown() + } } diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index 0645619aa6..22e693b933 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -6,6 +6,7 @@ package docs.persistence //#plugin-imports import scala.concurrent.Future +import scala.collection.immutable.Seq //#plugin-imports import com.typesafe.config._ @@ -69,6 +70,7 @@ class PersistencePluginDocSpec extends WordSpec { class MyJournal extends AsyncWriteJournal { def writeAsync(persistent: PersistentImpl): Future[Unit] = ??? + def writeBatchAsync(persistentBatch: Seq[PersistentImpl]): Future[Unit] = ??? def deleteAsync(persistent: PersistentImpl): Future[Unit] = ??? def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ??? def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) ⇒ Unit): Future[Long] = ??? diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 20fc1318db..9f61d2b4a5 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -331,6 +331,20 @@ The example also demonstrates how to change the processor's default behavior, de another behavior, defined by ``otherCommandHandler``, and back using ``context.become()`` and ``context.unbecome()``. See also the API docs of ``persist`` for further details. +Batch writes +============ + +Applications may also send a batch of ``Persistent`` messages to a processor via a ``PersistentBatch`` message. + +.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#batch-write + +``Persistent`` messages contained in a ``PersistentBatch`` message are written to the journal atomically but are +received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes +can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example, +in :ref:`event-sourcing`, all events that are generated and persisted by a single command are batch-written to the +journal. The recovery of an ``EventsourcedProcessor`` will therefore never be done partially i.e. with only a subset +of events persisted by a single command. + Storage plugins =============== diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java index 01ec3c2c74..41f6cb24cf 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java @@ -17,6 +17,14 @@ interface AsyncWritePlugin { */ Future doWriteAsync(PersistentImpl persistent); + /** + * Plugin Java API. + * + * Asynchronously writes a batch of persistent messages to the journal. The batch write + * must be atomic i.e. either all persistent messages in the batch are written or none. + */ + Future doWriteBatchAsync(Iterable persistentBatch); + /** * Plugin Java API. * diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java index dc022ed644..8660b5dab7 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java @@ -15,6 +15,14 @@ interface SyncWritePlugin { */ void doWrite(PersistentImpl persistent) throws Exception; + /** + * Plugin Java API. + * + * Synchronously writes a batch of persistent messages to the journal. The batch write + * must be atomic i.e. either all persistent messages in the batch are written or none. + */ + void doWriteBatch(Iterable persistentBatch); + /** * Plugin Java API. * diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index 254e7fe3d9..cd7bdab38e 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -8,6 +8,692 @@ public final class MessageFormats { public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { } + public interface PersistentMessageBatchOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // repeated .PersistentMessage batch = 1; + /** + * repeated .PersistentMessage batch = 1; + */ + java.util.List + getBatchList(); + /** + * repeated .PersistentMessage batch = 1; + */ + akka.persistence.serialization.MessageFormats.PersistentMessage getBatch(int index); + /** + * repeated .PersistentMessage batch = 1; + */ + int getBatchCount(); + /** + * repeated .PersistentMessage batch = 1; + */ + java.util.List + getBatchOrBuilderList(); + /** + * repeated .PersistentMessage batch = 1; + */ + akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder getBatchOrBuilder( + int index); + } + /** + * Protobuf type {@code PersistentMessageBatch} + */ + public static final class PersistentMessageBatch extends + com.google.protobuf.GeneratedMessage + implements PersistentMessageBatchOrBuilder { + // Use PersistentMessageBatch.newBuilder() to construct. + private PersistentMessageBatch(com.google.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private PersistentMessageBatch(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final PersistentMessageBatch defaultInstance; + public static PersistentMessageBatch getDefaultInstance() { + return defaultInstance; + } + + public PersistentMessageBatch getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private PersistentMessageBatch( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + batch_ = new java.util.ArrayList(); + mutable_bitField0_ |= 0x00000001; + } + batch_.add(input.readMessage(akka.persistence.serialization.MessageFormats.PersistentMessage.PARSER, extensionRegistry)); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + batch_ = java.util.Collections.unmodifiableList(batch_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentMessageBatch_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentMessageBatch_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.PersistentMessageBatch.class, akka.persistence.serialization.MessageFormats.PersistentMessageBatch.Builder.class); + } + + public static com.google.protobuf.Parser PARSER = + new com.google.protobuf.AbstractParser() { + public PersistentMessageBatch parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new PersistentMessageBatch(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser getParserForType() { + return PARSER; + } + + // repeated .PersistentMessage batch = 1; + public static final int BATCH_FIELD_NUMBER = 1; + private java.util.List batch_; + /** + * repeated .PersistentMessage batch = 1; + */ + public java.util.List getBatchList() { + return batch_; + } + /** + * repeated .PersistentMessage batch = 1; + */ + public java.util.List + getBatchOrBuilderList() { + return batch_; + } + /** + * repeated .PersistentMessage batch = 1; + */ + public int getBatchCount() { + return batch_.size(); + } + /** + * repeated .PersistentMessage batch = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessage getBatch(int index) { + return batch_.get(index); + } + /** + * repeated .PersistentMessage batch = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder getBatchOrBuilder( + int index) { + return batch_.get(index); + } + + private void initFields() { + batch_ = java.util.Collections.emptyList(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + for (int i = 0; i < getBatchCount(); i++) { + if (!getBatch(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + for (int i = 0; i < batch_.size(); i++) { + output.writeMessage(1, batch_.get(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + for (int i = 0; i < batch_.size(); i++) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(1, batch_.get(i)); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.persistence.serialization.MessageFormats.PersistentMessageBatch parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.persistence.serialization.MessageFormats.PersistentMessageBatch prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code PersistentMessageBatch} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements akka.persistence.serialization.MessageFormats.PersistentMessageBatchOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentMessageBatch_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentMessageBatch_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.persistence.serialization.MessageFormats.PersistentMessageBatch.class, akka.persistence.serialization.MessageFormats.PersistentMessageBatch.Builder.class); + } + + // Construct using akka.persistence.serialization.MessageFormats.PersistentMessageBatch.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getBatchFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + if (batchBuilder_ == null) { + batch_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000001); + } else { + batchBuilder_.clear(); + } + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.persistence.serialization.MessageFormats.internal_static_PersistentMessageBatch_descriptor; + } + + public akka.persistence.serialization.MessageFormats.PersistentMessageBatch getDefaultInstanceForType() { + return akka.persistence.serialization.MessageFormats.PersistentMessageBatch.getDefaultInstance(); + } + + public akka.persistence.serialization.MessageFormats.PersistentMessageBatch build() { + akka.persistence.serialization.MessageFormats.PersistentMessageBatch result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.persistence.serialization.MessageFormats.PersistentMessageBatch buildPartial() { + akka.persistence.serialization.MessageFormats.PersistentMessageBatch result = new akka.persistence.serialization.MessageFormats.PersistentMessageBatch(this); + int from_bitField0_ = bitField0_; + if (batchBuilder_ == null) { + if (((bitField0_ & 0x00000001) == 0x00000001)) { + batch_ = java.util.Collections.unmodifiableList(batch_); + bitField0_ = (bitField0_ & ~0x00000001); + } + result.batch_ = batch_; + } else { + result.batch_ = batchBuilder_.build(); + } + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.persistence.serialization.MessageFormats.PersistentMessageBatch) { + return mergeFrom((akka.persistence.serialization.MessageFormats.PersistentMessageBatch)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.persistence.serialization.MessageFormats.PersistentMessageBatch other) { + if (other == akka.persistence.serialization.MessageFormats.PersistentMessageBatch.getDefaultInstance()) return this; + if (batchBuilder_ == null) { + if (!other.batch_.isEmpty()) { + if (batch_.isEmpty()) { + batch_ = other.batch_; + bitField0_ = (bitField0_ & ~0x00000001); + } else { + ensureBatchIsMutable(); + batch_.addAll(other.batch_); + } + onChanged(); + } + } else { + if (!other.batch_.isEmpty()) { + if (batchBuilder_.isEmpty()) { + batchBuilder_.dispose(); + batchBuilder_ = null; + batch_ = other.batch_; + bitField0_ = (bitField0_ & ~0x00000001); + batchBuilder_ = + com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders ? + getBatchFieldBuilder() : null; + } else { + batchBuilder_.addAllMessages(other.batch_); + } + } + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + for (int i = 0; i < getBatchCount(); i++) { + if (!getBatch(i).isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.persistence.serialization.MessageFormats.PersistentMessageBatch parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.persistence.serialization.MessageFormats.PersistentMessageBatch) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // repeated .PersistentMessage batch = 1; + private java.util.List batch_ = + java.util.Collections.emptyList(); + private void ensureBatchIsMutable() { + if (!((bitField0_ & 0x00000001) == 0x00000001)) { + batch_ = new java.util.ArrayList(batch_); + bitField0_ |= 0x00000001; + } + } + + private com.google.protobuf.RepeatedFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentMessage, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder, akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder> batchBuilder_; + + /** + * repeated .PersistentMessage batch = 1; + */ + public java.util.List getBatchList() { + if (batchBuilder_ == null) { + return java.util.Collections.unmodifiableList(batch_); + } else { + return batchBuilder_.getMessageList(); + } + } + /** + * repeated .PersistentMessage batch = 1; + */ + public int getBatchCount() { + if (batchBuilder_ == null) { + return batch_.size(); + } else { + return batchBuilder_.getCount(); + } + } + /** + * repeated .PersistentMessage batch = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessage getBatch(int index) { + if (batchBuilder_ == null) { + return batch_.get(index); + } else { + return batchBuilder_.getMessage(index); + } + } + /** + * repeated .PersistentMessage batch = 1; + */ + public Builder setBatch( + int index, akka.persistence.serialization.MessageFormats.PersistentMessage value) { + if (batchBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureBatchIsMutable(); + batch_.set(index, value); + onChanged(); + } else { + batchBuilder_.setMessage(index, value); + } + return this; + } + /** + * repeated .PersistentMessage batch = 1; + */ + public Builder setBatch( + int index, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder builderForValue) { + if (batchBuilder_ == null) { + ensureBatchIsMutable(); + batch_.set(index, builderForValue.build()); + onChanged(); + } else { + batchBuilder_.setMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .PersistentMessage batch = 1; + */ + public Builder addBatch(akka.persistence.serialization.MessageFormats.PersistentMessage value) { + if (batchBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureBatchIsMutable(); + batch_.add(value); + onChanged(); + } else { + batchBuilder_.addMessage(value); + } + return this; + } + /** + * repeated .PersistentMessage batch = 1; + */ + public Builder addBatch( + int index, akka.persistence.serialization.MessageFormats.PersistentMessage value) { + if (batchBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + ensureBatchIsMutable(); + batch_.add(index, value); + onChanged(); + } else { + batchBuilder_.addMessage(index, value); + } + return this; + } + /** + * repeated .PersistentMessage batch = 1; + */ + public Builder addBatch( + akka.persistence.serialization.MessageFormats.PersistentMessage.Builder builderForValue) { + if (batchBuilder_ == null) { + ensureBatchIsMutable(); + batch_.add(builderForValue.build()); + onChanged(); + } else { + batchBuilder_.addMessage(builderForValue.build()); + } + return this; + } + /** + * repeated .PersistentMessage batch = 1; + */ + public Builder addBatch( + int index, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder builderForValue) { + if (batchBuilder_ == null) { + ensureBatchIsMutable(); + batch_.add(index, builderForValue.build()); + onChanged(); + } else { + batchBuilder_.addMessage(index, builderForValue.build()); + } + return this; + } + /** + * repeated .PersistentMessage batch = 1; + */ + public Builder addAllBatch( + java.lang.Iterable values) { + if (batchBuilder_ == null) { + ensureBatchIsMutable(); + super.addAll(values, batch_); + onChanged(); + } else { + batchBuilder_.addAllMessages(values); + } + return this; + } + /** + * repeated .PersistentMessage batch = 1; + */ + public Builder clearBatch() { + if (batchBuilder_ == null) { + batch_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000001); + onChanged(); + } else { + batchBuilder_.clear(); + } + return this; + } + /** + * repeated .PersistentMessage batch = 1; + */ + public Builder removeBatch(int index) { + if (batchBuilder_ == null) { + ensureBatchIsMutable(); + batch_.remove(index); + onChanged(); + } else { + batchBuilder_.remove(index); + } + return this; + } + /** + * repeated .PersistentMessage batch = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessage.Builder getBatchBuilder( + int index) { + return getBatchFieldBuilder().getBuilder(index); + } + /** + * repeated .PersistentMessage batch = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder getBatchOrBuilder( + int index) { + if (batchBuilder_ == null) { + return batch_.get(index); } else { + return batchBuilder_.getMessageOrBuilder(index); + } + } + /** + * repeated .PersistentMessage batch = 1; + */ + public java.util.List + getBatchOrBuilderList() { + if (batchBuilder_ != null) { + return batchBuilder_.getMessageOrBuilderList(); + } else { + return java.util.Collections.unmodifiableList(batch_); + } + } + /** + * repeated .PersistentMessage batch = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessage.Builder addBatchBuilder() { + return getBatchFieldBuilder().addBuilder( + akka.persistence.serialization.MessageFormats.PersistentMessage.getDefaultInstance()); + } + /** + * repeated .PersistentMessage batch = 1; + */ + public akka.persistence.serialization.MessageFormats.PersistentMessage.Builder addBatchBuilder( + int index) { + return getBatchFieldBuilder().addBuilder( + index, akka.persistence.serialization.MessageFormats.PersistentMessage.getDefaultInstance()); + } + /** + * repeated .PersistentMessage batch = 1; + */ + public java.util.List + getBatchBuilderList() { + return getBatchFieldBuilder().getBuilderList(); + } + private com.google.protobuf.RepeatedFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentMessage, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder, akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder> + getBatchFieldBuilder() { + if (batchBuilder_ == null) { + batchBuilder_ = new com.google.protobuf.RepeatedFieldBuilder< + akka.persistence.serialization.MessageFormats.PersistentMessage, akka.persistence.serialization.MessageFormats.PersistentMessage.Builder, akka.persistence.serialization.MessageFormats.PersistentMessageOrBuilder>( + batch_, + ((bitField0_ & 0x00000001) == 0x00000001), + getParentForChildren(), + isClean()); + batch_ = null; + } + return batchBuilder_; + } + + // @@protoc_insertion_point(builder_scope:PersistentMessageBatch) + } + + static { + defaultInstance = new PersistentMessageBatch(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:PersistentMessageBatch) + } + public interface PersistentMessageOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -3059,6 +3745,11 @@ public final class MessageFormats { // @@protoc_insertion_point(class_scope:ConfirmMessage) } + private static com.google.protobuf.Descriptors.Descriptor + internal_static_PersistentMessageBatch_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_PersistentMessageBatch_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_PersistentMessage_descriptor; private static @@ -3083,38 +3774,46 @@ public final class MessageFormats { descriptor; static { java.lang.String[] descriptorData = { - "\n\024MessageFormats.proto\"\371\001\n\021PersistentMes" + - "sage\022#\n\007payload\030\001 \001(\0132\022.PersistentPayloa" + - "d\022\022\n\nsequenceNr\030\002 \001(\003\022\023\n\013processorId\030\003 \001" + - "(\t\022\021\n\tchannelId\030\004 \001(\t\022\017\n\007deleted\030\005 \001(\010\022\020" + - "\n\010resolved\030\006 \001(\010\022\020\n\010confirms\030\010 \003(\t\022\'\n\016co" + - "nfirmMessage\030\n \001(\0132\017.ConfirmMessage\022\025\n\rc" + - "onfirmTarget\030\t \001(\t\022\016\n\006sender\030\007 \001(\t\"S\n\021Pe" + - "rsistentPayload\022\024\n\014serializerId\030\001 \002(\005\022\017\n" + - "\007payload\030\002 \002(\014\022\027\n\017payloadManifest\030\003 \001(\014\"" + - "L\n\016ConfirmMessage\022\023\n\013processorId\030\001 \001(\t\022\022", - "\n\nsequenceNr\030\002 \001(\003\022\021\n\tchannelId\030\003 \001(\tB\"\n" + - "\036akka.persistence.serializationH\001" + "\n\024MessageFormats.proto\";\n\026PersistentMess" + + "ageBatch\022!\n\005batch\030\001 \003(\0132\022.PersistentMess" + + "age\"\371\001\n\021PersistentMessage\022#\n\007payload\030\001 \001" + + "(\0132\022.PersistentPayload\022\022\n\nsequenceNr\030\002 \001" + + "(\003\022\023\n\013processorId\030\003 \001(\t\022\021\n\tchannelId\030\004 \001" + + "(\t\022\017\n\007deleted\030\005 \001(\010\022\020\n\010resolved\030\006 \001(\010\022\020\n" + + "\010confirms\030\010 \003(\t\022\'\n\016confirmMessage\030\n \001(\0132" + + "\017.ConfirmMessage\022\025\n\rconfirmTarget\030\t \001(\t\022" + + "\016\n\006sender\030\007 \001(\t\"S\n\021PersistentPayload\022\024\n\014" + + "serializerId\030\001 \002(\005\022\017\n\007payload\030\002 \002(\014\022\027\n\017p", + "ayloadManifest\030\003 \001(\014\"L\n\016ConfirmMessage\022\023" + + "\n\013processorId\030\001 \001(\t\022\022\n\nsequenceNr\030\002 \001(\003\022" + + "\021\n\tchannelId\030\003 \001(\tB\"\n\036akka.persistence.s" + + "erializationH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { public com.google.protobuf.ExtensionRegistry assignDescriptors( com.google.protobuf.Descriptors.FileDescriptor root) { descriptor = root; - internal_static_PersistentMessage_descriptor = + internal_static_PersistentMessageBatch_descriptor = getDescriptor().getMessageTypes().get(0); + internal_static_PersistentMessageBatch_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_PersistentMessageBatch_descriptor, + new java.lang.String[] { "Batch", }); + internal_static_PersistentMessage_descriptor = + getDescriptor().getMessageTypes().get(1); internal_static_PersistentMessage_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PersistentMessage_descriptor, new java.lang.String[] { "Payload", "SequenceNr", "ProcessorId", "ChannelId", "Deleted", "Resolved", "Confirms", "ConfirmMessage", "ConfirmTarget", "Sender", }); internal_static_PersistentPayload_descriptor = - getDescriptor().getMessageTypes().get(1); + getDescriptor().getMessageTypes().get(2); internal_static_PersistentPayload_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PersistentPayload_descriptor, new java.lang.String[] { "SerializerId", "Payload", "PayloadManifest", }); internal_static_ConfirmMessage_descriptor = - getDescriptor().getMessageTypes().get(2); + getDescriptor().getMessageTypes().get(3); internal_static_ConfirmMessage_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ConfirmMessage_descriptor, diff --git a/akka-persistence/src/main/protobuf/MessageFormats.proto b/akka-persistence/src/main/protobuf/MessageFormats.proto index eaf389c044..4b31a4e459 100644 --- a/akka-persistence/src/main/protobuf/MessageFormats.proto +++ b/akka-persistence/src/main/protobuf/MessageFormats.proto @@ -5,6 +5,10 @@ option java_package = "akka.persistence.serialization"; option optimize_for = SPEED; +message PersistentMessageBatch { + repeated PersistentMessage batch = 1; +} + message PersistentMessage { optional PersistentPayload payload = 1; optional int64 sequenceNr = 2; diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index ed9f7e8576..633b6ff2da 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -18,6 +18,7 @@ akka { serialization-bindings { "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot + "akka.persistence.PersistentBatch" = akka-persistence-message "akka.persistence.PersistentImpl" = akka-persistence-message "akka.persistence.Confirm" = akka-persistence-message } diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 5257c52fd2..e645b47ba7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -21,20 +21,37 @@ private[persistence] trait Eventsourced extends Processor { def aroundReceive(receive: Receive, message: Any): Unit } + /** + * Processor recovery state. Waits for recovery completion and then changes to + * `processingCommands` + */ + private val recovering: State = new State { + def aroundReceive(receive: Receive, message: Any) { + Eventsourced.super.aroundReceive(receive, message) + message match { + case _: ReplaySuccess | _: ReplayFailure ⇒ currentState = processingCommands + case _ ⇒ + } + } + } + /** * Command processing state. If event persistence is pending after processing a * command, event persistence is triggered and state changes to `persistingEvents`. + * + * There's no need to loop commands though the journal any more i.e. they can now be + * directly offered as `LoopSuccess` to the state machine implemented by `Processor`. */ private val processingCommands: State = new State { - def aroundReceive(receive: Receive, message: Any) = message match { - case m if (persistInvocations.isEmpty) ⇒ { - Eventsourced.super.aroundReceive(receive, m) - if (!persistInvocations.isEmpty) { - persistInvocations = persistInvocations.reverse - persistCandidates = persistCandidates.reverse - persistCandidates.foreach(self forward Persistent(_)) - currentState = persistingEvents - } + def aroundReceive(receive: Receive, message: Any) { + Eventsourced.super.aroundReceive(receive, LoopSuccess(message)) + if (!persistInvocations.isEmpty) { + currentState = persistingEvents + Eventsourced.super.aroundReceive(receive, PersistentBatch(persistentEventBatch.reverse)) + persistInvocations = persistInvocations.reverse + persistentEventBatch = Nil + } else { + processorStash.unstash() } } } @@ -46,9 +63,13 @@ private[persistence] trait Eventsourced extends Processor { */ private val persistingEvents: State = new State { def aroundReceive(receive: Receive, message: Any) = message match { - case p: PersistentImpl if identical(p.payload, persistCandidates.head) ⇒ { - Eventsourced.super.aroundReceive(receive, message) - persistCandidates = persistCandidates.tail + case PersistentBatch(b) ⇒ { + b.foreach(deleteMessage) + throw new UnsupportedOperationException("Persistent command batches not supported") + } + case p: PersistentImpl ⇒ { + deleteMessage(p) + throw new UnsupportedOperationException("Persistent commands not supported") } case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) ⇒ { withCurrentPersistent(p)(p ⇒ persistInvocations.head._2(p.payload)) @@ -65,7 +86,7 @@ private[persistence] trait Eventsourced extends Processor { persistInvocations = persistInvocations.tail if (persistInvocations.isEmpty) { currentState = processingCommands - processorStash.unstashAll() + processorStash.unstash() } } @@ -74,9 +95,9 @@ private[persistence] trait Eventsourced extends Processor { } private var persistInvocations: List[(Any, Any ⇒ Unit)] = Nil - private var persistCandidates: List[Any] = Nil + private var persistentEventBatch: List[PersistentImpl] = Nil - private var currentState: State = processingCommands + private var currentState: State = recovering private val processorStash = createProcessorStash /** @@ -103,7 +124,7 @@ private[persistence] trait Eventsourced extends Processor { */ final def persist[A](event: A)(handler: A ⇒ Unit): Unit = { persistInvocations = (event, handler.asInstanceOf[Any ⇒ Unit]) :: persistInvocations - persistCandidates = event :: persistCandidates + persistentEventBatch = PersistentImpl(event) :: persistentEventBatch } /** @@ -137,6 +158,13 @@ private[persistence] trait Eventsourced extends Processor { */ def receiveCommand: Receive + override def unstashAll() { + // Internally, all messages are processed by unstashing them from + // the internal stash one-by-one. Hence, an unstashAll() from the + // user stash must be prepended to the internal stash. + processorStash.prepend(clearStash()) + } + /** * INTERNAL API. */ @@ -144,6 +172,22 @@ private[persistence] trait Eventsourced extends Processor { currentState.aroundReceive(receive, message) } + /** + * Calls `super.preRestart` then unstashes all messages from the internal stash. + */ + override def preRestart(reason: Throwable, message: Option[Any]) { + super.preRestart(reason, message) + processorStash.unstashAll() + } + + /** + * Calls `super.postStop` then unstashes all messages from the internal stash. + */ + override def postStop() { + super.postStop() + processorStash.unstashAll() + } + /** * INTERNAL API. */ @@ -242,7 +286,9 @@ abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Events * Command handler. Typically validates commands against current state (and/or by * communication with other actors). On successful validation, one or more events are * derived from a command and these events are then persisted by calling `persist`. - * Commands sent to event sourced processors should not be [[Persistent]] messages. + * Commands sent to event sourced processors must not be [[Persistent]] or + * [[PersistentBatch]] messages. In this case an `UnsupportedOperationException` is + * thrown by the processor. */ def onReceiveCommand(msg: Any): Unit } \ No newline at end of file diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index 45c0228a83..81b7e21470 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -4,6 +4,8 @@ package akka.persistence +import scala.collection.immutable + import akka.actor._ /** @@ -18,6 +20,14 @@ private[persistence] object JournalProtocol { */ case class Delete(persistent: Persistent) + /** + * Instructs a journal to persist a sequence of messages. + * + * @param persistentBatch batch of messages to be persisted. + * @param processor requesting processor. + */ + case class WriteBatch(persistentBatch: immutable.Seq[PersistentImpl], processor: ActorRef) + /** * Instructs a journal to persist a message. * diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 7d8b33d95f..075a2777a0 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -4,9 +4,13 @@ package akka.persistence +import java.lang.{ Iterable ⇒ JIterable } import java.util.{ List ⇒ JList } +import scala.collection.immutable + import akka.actor.ActorRef +import akka.japi.Util.immutableSeq /** * Persistent message. @@ -78,6 +82,27 @@ object Persistent { Some((persistent.payload, persistent.sequenceNr)) } +/** + * Instructs a [[Processor]] to atomically write the contained [[Persistent]] messages to the + * journal. The processor receives the written messages individually as [[Persistent]] messages. + * During recovery, they are also replayed individually. + */ +case class PersistentBatch(persistentBatch: immutable.Seq[Persistent]) { + /** + * INTERNAL API. + */ + private[persistence] def persistentImplList: List[PersistentImpl] = + persistentBatch.toList.asInstanceOf[List[PersistentImpl]] +} + +object PersistentBatch { + /** + * JAVA API. + */ + def create(persistentBatch: JIterable[Persistent]) = + PersistentBatch(immutableSeq(persistentBatch)) +} + /** * Plugin API. * @@ -123,6 +148,9 @@ case class PersistentImpl( * Java Plugin API. */ def getConfirms: JList[String] = confirms.asJava + + private[persistence] def prepareWrite(sender: ActorRef) = + copy(sender = sender, resolved = false, confirmTarget = null, confirmMessage = null) } object PersistentImpl { diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 0e1244075a..251ef053c4 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -4,6 +4,8 @@ package akka.persistence +import scala.collection.immutable + import akka.actor._ import akka.dispatch._ @@ -49,6 +51,7 @@ import akka.dispatch._ * * @see [[UntypedProcessor]] * @see [[Recover]] + * @see [[PersistentBatch]] */ trait Processor extends Actor with Stash { import JournalProtocol._ @@ -151,9 +154,10 @@ trait Processor extends Actor with Stash { throw new ActorKilledException(errorMsg) } } - case LoopSuccess(m) ⇒ process(receive, m) - case p: PersistentImpl ⇒ journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self) - case m ⇒ journal forward Loop(m, self) + case LoopSuccess(m) ⇒ process(receive, m) + case p: PersistentImpl ⇒ journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self) + case pb: PersistentBatch ⇒ journal forward WriteBatch(pb.persistentImplList.map(_.copy(processorId = processorId, sequenceNr = nextSequenceNr())), self) + case m ⇒ journal forward Loop(m, self) } } @@ -362,6 +366,16 @@ trait Processor extends Actor with Stash { def stash(): Unit = theStash :+= currentEnvelope + def prepend(others: immutable.Seq[Envelope]): Unit = + others.reverseIterator.foreach(env ⇒ theStash = env +: theStash) + + def unstash(): Unit = try { + if (theStash.nonEmpty) { + mailbox.enqueueFirst(self, theStash.head) + theStash = theStash.tail + } + } + def unstashAll(): Unit = try { val i = theStash.reverseIterator while (i.hasNext) mailbox.enqueueFirst(self, i.next()) @@ -377,7 +391,24 @@ trait Processor extends Actor with Stash { * Processor specific stash used internally to avoid interference with user stash. */ private[persistence] trait ProcessorStash { + /** + * Appends the current message to this stash. + */ def stash() + + /** + * Prepends `others` to this stash. + */ + def prepend(others: immutable.Seq[Envelope]) + + /** + * Unstashes a single message from this stash. + */ + def unstash() + + /** + * Unstashes all messages from this stash. + */ def unstashAll() } @@ -434,6 +465,7 @@ private[persistence] trait ProcessorStash { * * @see [[Processor]] * @see [[Recover]] + * @see [[PersistentBatch]] */ abstract class UntypedProcessor extends UntypedActor with Processor { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index cc3b9e07e0..b10328b398 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -4,6 +4,7 @@ package akka.persistence.journal +import scala.collection.immutable import scala.concurrent.Future import scala.util._ @@ -11,7 +12,6 @@ import akka.actor._ import akka.pattern.{ pipe, PromiseActorRef } import akka.persistence._ import akka.persistence.JournalProtocol._ -import akka.serialization.Serialization /** * Abstract journal, optimized for asynchronous, non-blocking writes. @@ -28,13 +28,26 @@ trait AsyncWriteJournal extends Actor with AsyncReplay { val csdr = sender val cctr = resequencerCounter val psdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender - writeAsync(persistent.copy(sender = psdr, resolved = false, confirmTarget = null, confirmMessage = null)) map { + writeAsync(persistent.prepareWrite(psdr)) map { _ ⇒ Desequenced(WriteSuccess(persistent), cctr, processor, csdr) } recover { case e ⇒ Desequenced(WriteFailure(persistent, e), cctr, processor, csdr) } pipeTo (resequencer) resequencerCounter += 1 } + case WriteBatch(persistentBatch, processor) ⇒ { + val csdr = sender + val cctr = resequencerCounter + val psdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender + def resequence(f: PersistentImpl ⇒ Any) = persistentBatch.zipWithIndex.foreach { + case (p, i) ⇒ resequencer ! Desequenced(f(p), cctr + i, processor, csdr) + } + writeBatchAsync(persistentBatch.map(_.prepareWrite(psdr))) onComplete { + case Success(_) ⇒ resequence(WriteSuccess(_)) + case Failure(e) ⇒ resequence(WriteFailure(_, e)) + } + resequencerCounter += persistentBatch.length + } case Replay(fromSequenceNr, toSequenceNr, processorId, processor) ⇒ { // Send replayed messages and replay result to processor directly. No need // to resequence replayed messages relative to written and looped messages. @@ -73,6 +86,14 @@ trait AsyncWriteJournal extends Actor with AsyncReplay { */ def writeAsync(persistent: PersistentImpl): Future[Unit] + /** + * Plugin API. + * + * Asynchronously writes a batch of persistent messages to the journal. The batch write + * must be atomic i.e. either all persistent messages in the batch are written or none. + */ + def writeBatchAsync(persistentBatch: immutable.Seq[PersistentImpl]): Future[Unit] + /** * Plugin API. * diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index dfa4c7e7d5..c0310e73dd 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -4,12 +4,12 @@ package akka.persistence.journal +import scala.collection.immutable import scala.util._ import akka.actor.Actor import akka.pattern.{ pipe, PromiseActorRef } import akka.persistence._ -import akka.serialization.Serialization /** * Abstract journal, optimized for synchronous writes. @@ -23,11 +23,18 @@ trait SyncWriteJournal extends Actor with AsyncReplay { final def receive = { case Write(persistent, processor) ⇒ { val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender - Try(write(persistent.copy(sender = sdr, resolved = false, confirmTarget = null, confirmMessage = null))) match { + Try(write(persistent.prepareWrite(sdr))) match { case Success(_) ⇒ processor forward WriteSuccess(persistent) case Failure(e) ⇒ processor forward WriteFailure(persistent, e); throw e } } + case WriteBatch(persistentBatch, processor) ⇒ { + val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender + Try(writeBatch(persistentBatch.map(_.prepareWrite(sdr)))) match { + case Success(_) ⇒ persistentBatch.foreach(processor forward WriteSuccess(_)) + case Failure(e) ⇒ persistentBatch.foreach(processor forward WriteFailure(_, e)); throw e + } + } case Replay(fromSequenceNr, toSequenceNr, processorId, processor) ⇒ { replayAsync(processorId, fromSequenceNr, toSequenceNr) { p ⇒ if (!p.deleted) processor.tell(Replayed(p), p.sender) @@ -57,6 +64,14 @@ trait SyncWriteJournal extends Actor with AsyncReplay { */ def write(persistent: PersistentImpl): Unit + /** + * Plugin API. + * + * Synchronously writes a batch of persistent messages to the journal. The batch write + * must be atomic i.e. either all persistent messages in the batch are written or none. + */ + def writeBatch(persistentBatch: immutable.Seq[PersistentImpl]): Unit + /** * Plugin API. * diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index fd8f723bc7..11660b1db5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -4,6 +4,7 @@ package akka.persistence.journal.inmem +import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.postfixOps @@ -29,6 +30,9 @@ private[persistence] class InmemJournal extends AsyncWriteJournal { def writeAsync(persistent: PersistentImpl): Future[Unit] = (store ? Write(persistent)).mapTo[Unit] + def writeBatchAsync(persistentBatch: immutable.Seq[PersistentImpl]): Future[Unit] = + (store ? WriteBatch(persistentBatch)).mapTo[Unit] + def deleteAsync(persistent: PersistentImpl): Future[Unit] = (store ? Delete(persistent)).mapTo[Unit] @@ -47,6 +51,7 @@ private[persistence] class InmemStore extends Actor { def receive = { case Write(p) ⇒ add(p); success() + case WriteBatch(pb) ⇒ pb.foreach(add); success() case Delete(p) ⇒ update(p.processorId, p.sequenceNr)(_.copy(deleted = true)); success() case Confirm(pid, snr, cid) ⇒ update(pid, snr)(p ⇒ p.copy(confirms = cid +: p.confirms)); success() case Replay(pid, fromSnr, toSnr, callback) ⇒ { @@ -84,6 +89,7 @@ private[persistence] class InmemStore extends Actor { private[persistence] object InmemStore { case class Write(p: PersistentImpl) + case class WriteBatch(pb: Seq[PersistentImpl]) case class Delete(p: PersistentImpl) case class Confirm(processorId: String, sequenceNr: Long, channelId: String) case class Replay(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: (PersistentImpl) ⇒ Unit) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala index e841a5622a..d3d557001f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala @@ -4,7 +4,8 @@ package akka.persistence.journal.japi -import scala.concurrent.Future +import scala.collection.immutable +import scala.collection.JavaConverters._ import akka.persistence.journal.{ AsyncWriteJournal ⇒ SAsyncWriteJournal } import akka.persistence.PersistentImpl @@ -20,6 +21,9 @@ abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal wit final def writeAsync(persistent: PersistentImpl) = doWriteAsync(persistent).map(Unit.unbox) + final def writeBatchAsync(persistentBatch: immutable.Seq[PersistentImpl]) = + doWriteBatchAsync(persistentBatch.asJava).map(Unit.unbox) + final def deleteAsync(persistent: PersistentImpl) = doDeleteAsync(persistent).map(Unit.unbox) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala index 992ca8f5d6..e91af7ce0e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala @@ -4,6 +4,9 @@ package akka.persistence.journal.japi +import scala.collection.immutable +import scala.collection.JavaConverters._ + import akka.persistence.journal.{ SyncWriteJournal ⇒ SSyncWriteJournal } import akka.persistence.PersistentImpl @@ -16,6 +19,9 @@ abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal with final def write(persistent: PersistentImpl) = doWrite(persistent) + final def writeBatch(persistentBatch: immutable.Seq[PersistentImpl]) = + doWriteBatch(persistentBatch.asJava) + final def delete(persistent: PersistentImpl) = doDelete(persistent) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index b1c8c14543..1eed3cd9df 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -6,6 +6,8 @@ package akka.persistence.journal.leveldb import java.io.File +import scala.collection.immutable + import org.iq80.leveldb._ import akka.persistence._ @@ -36,11 +38,11 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap import Key._ - def write(persistent: PersistentImpl) = withBatch { batch ⇒ - val nid = numericId(persistent.processorId) - batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr)) - batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent)) - } + def write(persistent: PersistentImpl) = + withBatch(batch ⇒ addToBatch(persistent, batch)) + + def writeBatch(persistentBatch: immutable.Seq[PersistentImpl]) = + withBatch(batch ⇒ persistentBatch.foreach(persistent ⇒ addToBatch(persistent, batch))) def delete(persistent: PersistentImpl) { leveldb.put(keyToBytes(deletionKey(numericId(persistent.processorId), persistent.sequenceNr)), Array.empty[Byte]) @@ -56,6 +58,12 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap def persistentToBytes(p: PersistentImpl): Array[Byte] = serialization.serialize(p).get def persistentFromBytes(a: Array[Byte]): PersistentImpl = serialization.deserialize(a, classOf[PersistentImpl]).get + private def addToBatch(persistent: PersistentImpl, batch: WriteBatch): Unit = { + val nid = numericId(persistent.processorId) + batch.put(keyToBytes(counterKey(nid)), counterToBytes(persistent.sequenceNr)) + batch.put(keyToBytes(Key(nid, persistent.sequenceNr, 0)), persistentToBytes(persistent)) + } + private def withBatch[R](body: WriteBatch ⇒ R): R = { val batch = leveldb.createWriteBatch() try { diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index afcbbe0183..0da51c0c47 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -9,6 +9,7 @@ import scala.language.existentials import com.google.protobuf._ import akka.actor.ExtendedActorSystem +import akka.japi.Util.immutableSeq import akka.persistence._ import akka.persistence.serialization.MessageFormats._ import akka.serialization._ @@ -19,6 +20,7 @@ import akka.serialization._ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { import PersistentImpl.Undefined + val PersistentBatchClass = classOf[PersistentBatch] val PersistentClass = classOf[PersistentImpl] val ConfirmClass = classOf[Confirm] @@ -26,25 +28,27 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { def includeManifest: Boolean = true /** - * Serializes a [[Persistent]] message. Delegates serialization of the persistent message's - * payload to a matching `akka.serialization.Serializer`. + * Serializes [[PersistentBatch]] and [[Persistent]]. Delegates serialization of a + * persistent message's payload to a matching `akka.serialization.Serializer`. */ def toBinary(o: AnyRef): Array[Byte] = o match { - case p: PersistentImpl ⇒ persistentMessageBuilder(p).build().toByteArray - case c: Confirm ⇒ confirmMessageBuilder(c).build().toByteArray - case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") + case b: PersistentBatch ⇒ persistentMessageBatchBuilder(b).build().toByteArray + case p: PersistentImpl ⇒ persistentMessageBuilder(p).build().toByteArray + case c: Confirm ⇒ confirmMessageBuilder(c).build().toByteArray + case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") } /** - * Deserializes a [[Persistent]] message. Delegates deserialization of the persistent message's - * payload to a matching `akka.serialization.Serializer`. + * Deserializes [[PersistentBatch]] and [[Persistent]]. Delegates deserialization of a + * persistent message's payload to a matching `akka.serialization.Serializer`. */ def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match { case None ⇒ persistent(PersistentMessage.parseFrom(bytes)) case Some(c) ⇒ c match { - case PersistentClass ⇒ persistent(PersistentMessage.parseFrom(bytes)) - case ConfirmClass ⇒ confirm(ConfirmMessage.parseFrom(bytes)) - case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}") + case PersistentBatchClass ⇒ persistentBatch(PersistentMessageBatch.parseFrom(bytes)) + case PersistentClass ⇒ persistent(PersistentMessage.parseFrom(bytes)) + case ConfirmClass ⇒ confirm(ConfirmMessage.parseFrom(bytes)) + case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}") } } @@ -52,6 +56,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { // toBinary helpers // + private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = { + val builder = PersistentMessageBatch.newBuilder + persistentBatch.persistentImplList.foreach(p ⇒ builder.addBatch(persistentMessageBuilder(p))) + builder + } + private def persistentMessageBuilder(persistent: PersistentImpl) = { val builder = PersistentMessage.newBuilder @@ -92,8 +102,10 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { // fromBinary helpers // + private def persistentBatch(persistentMessageBatch: PersistentMessageBatch): PersistentBatch = + PersistentBatch(immutableSeq(persistentMessageBatch.getBatchList).map(persistent)) + private def persistent(persistentMessage: PersistentMessage): PersistentImpl = { - import scala.collection.JavaConverters._ PersistentImpl( payload(persistentMessage.getPayload), persistentMessage.getSequenceNr, @@ -101,7 +113,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { if (persistentMessage.hasChannelId) persistentMessage.getChannelId else Undefined, persistentMessage.getDeleted, persistentMessage.getResolved, - persistentMessage.getConfirmsList.asScala.toList, + immutableSeq(persistentMessage.getConfirmsList), if (persistentMessage.hasConfirmMessage) confirm(persistentMessage.getConfirmMessage) else null, if (persistentMessage.hasConfirmTarget) system.provider.resolveActorRef(persistentMessage.getConfirmTarget) else null, if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else null) diff --git a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala index 45a4be11b4..4232e24a15 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala @@ -23,7 +23,7 @@ object EventsourcedSpec { } val commonBehavior: Receive = { - case "boom" ⇒ throw new Exception("boom") + case "boom" ⇒ throw new TestException("boom") case GetState ⇒ sender ! events.reverse } @@ -164,6 +164,51 @@ object EventsourcedSpec { } } + class UserStashManyProcessor(name: String) extends ExampleProcessor(name) { + val receiveCommand: Receive = commonBehavior orElse { + case Cmd("a") ⇒ persist(Evt("a")) { evt ⇒ + updateState(evt) + context.become(processC) + } + case Cmd("b-1") ⇒ persist(Evt("b-1"))(updateState) + case Cmd("b-2") ⇒ persist(Evt("b-2"))(updateState) + } + + val processC: Receive = { + case Cmd("c") ⇒ { + persist(Evt("c")) { evt ⇒ + updateState(evt) + context.unbecome() + } + unstashAll() + } + case other ⇒ stash() + } + } + + class UserStashFailureProcessor(name: String) extends ExampleProcessor(name) { + val receiveCommand: Receive = commonBehavior orElse { + case Cmd(data) ⇒ { + if (data == "b-2") throw new TestException("boom") + persist(Evt(data)) { event ⇒ + updateState(event) + if (data == "a") context.become(otherCommandHandler) + } + } + } + + val otherCommandHandler: Receive = { + case Cmd("c") ⇒ { + persist(Evt("c")) { event ⇒ + updateState(event) + context.unbecome() + } + unstashAll() + } + case other ⇒ stash() + } + } + class AnyValEventProcessor(name: String) extends ExampleProcessor(name) { val receiveCommand: Receive = { case Cmd("a") ⇒ persist(5)(evt ⇒ sender ! evt) @@ -272,7 +317,7 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe processor ! Cmd("a") expectMsg("a") } - "not interfere with the user stash" in { + "support user stash operations" in { val processor = namedProcessor[UserStashProcessor] processor ! Cmd("a") processor ! Cmd("b") @@ -281,6 +326,25 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe expectMsg("c") expectMsg("a") } + "support user stash operations with several stashed messages" in { + val processor = namedProcessor[UserStashManyProcessor] + val n = 10 + val cmds = 1 to n flatMap (_ ⇒ List(Cmd("a"), Cmd("b-1"), Cmd("b-2"), Cmd("c"))) + val evts = 1 to n flatMap (_ ⇒ List("a", "c", "b-1", "b-2")) + + cmds foreach (processor ! _) + processor ! GetState + expectMsg((List("a-1", "a-2") ++ evts)) + } + "support user stash operations under failures" in { + val processor = namedProcessor[UserStashFailureProcessor] + val bs = 1 to 10 map ("b-" + _) + processor ! Cmd("a") + bs foreach (processor ! Cmd(_)) + processor ! Cmd("c") + processor ! GetState + expectMsg(List("a-1", "a-2", "a", "c") ++ bs.filter(_ != "b-2")) + } "be able to persist events that extend AnyVal" in { val processor = namedProcessor[AnyValEventProcessor] processor ! Cmd("a") diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala new file mode 100644 index 0000000000..c23ad9f920 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -0,0 +1,163 @@ +package akka.persistence + +import scala.concurrent.duration._ +import scala.language.postfixOps + +import com.typesafe.config.ConfigFactory + +import akka.actor._ +import akka.testkit._ + +object PerformanceSpec { + // multiply cycles with 100 for more + // accurate throughput measurements + val config = + """ + akka.persistence.performance.cycles.warmup = 300 + akka.persistence.performance.cycles.load = 1000 + """ + + case object StartMeasure + case object StopMeasure + case class FailAt(sequenceNr: Long) + + abstract class PerformanceTestProcessor(name: String) extends NamedProcessor(name) { + val NanoToSecond = 1000.0 * 1000 * 1000 + + var startTime: Long = 0L + var stopTime: Long = 0L + + var startSequenceNr = 0L; + var stopSequenceNr = 0L; + + var failAt: Long = -1 + + val controlBehavior: Receive = { + case StartMeasure ⇒ { + startSequenceNr = lastSequenceNr + startTime = System.nanoTime + } + case StopMeasure ⇒ { + stopSequenceNr = lastSequenceNr + stopTime = System.nanoTime + sender ! (NanoToSecond * (stopSequenceNr - startSequenceNr) / (stopTime - startTime)) + } + case FailAt(sequenceNr) ⇒ failAt = sequenceNr + } + + override def postRestart(reason: Throwable) { + super.postRestart(reason) + receive(StartMeasure) + } + } + + class CommandsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) { + def receive = controlBehavior orElse { + case p: Persistent ⇒ { + if (lastSequenceNr % 1000 == 0) if (recoveryRunning) print("r") else print(".") + if (lastSequenceNr == failAt) throw new TestException("boom") + } + } + } + + class EventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with EventsourcedProcessor { + val receiveReplay: Receive = { + case _ ⇒ if (lastSequenceNr % 1000 == 0) print("r") + } + + val receiveCommand: Receive = controlBehavior orElse { + case cmd ⇒ persist(cmd) { _ ⇒ + if (lastSequenceNr % 1000 == 0) print(".") + if (lastSequenceNr == failAt) throw new TestException("boom") + } + } + } + + class StashingEventsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) with EventsourcedProcessor { + val receiveReplay: Receive = { + case _ ⇒ if (lastSequenceNr % 1000 == 0) print("r") + } + + val printProgress: PartialFunction[Any, Any] = { + case m ⇒ if (lastSequenceNr % 1000 == 0) print("."); m + } + + val receiveCommand: Receive = printProgress andThen (controlBehavior orElse { + case "a" ⇒ persist("a")(_ ⇒ context.become(processC)) + case "b" ⇒ persist("b")(_ ⇒ ()) + }) + + val processC: Receive = printProgress andThen { + case "c" ⇒ { + persist("c")(_ ⇒ context.unbecome()) + unstashAll() + } + case other ⇒ stash() + } + } +} + +class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "performance").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with PersistenceSpec with ImplicitSender { + import PerformanceSpec._ + + val warmupClycles = system.settings.config.getInt("akka.persistence.performance.cycles.warmup") + val loadCycles = system.settings.config.getInt("akka.persistence.performance.cycles.load") + + def stressCommandsourcedProcessor(failAt: Option[Long]): Unit = { + val processor = namedProcessor[CommandsourcedTestProcessor] + failAt foreach { processor ! FailAt(_) } + 1 to warmupClycles foreach { i ⇒ processor ! Persistent(s"msg${i}") } + processor ! StartMeasure + 1 to loadCycles foreach { i ⇒ processor ! Persistent(s"msg${i}") } + processor ! StopMeasure + expectMsgPF(100 seconds) { + case throughput: Double ⇒ println(f"\nthroughput = $throughput%.2f persistent commands per second") + } + } + + def stressEventsourcedProcessor(failAt: Option[Long]): Unit = { + val processor = namedProcessor[EventsourcedTestProcessor] + failAt foreach { processor ! FailAt(_) } + 1 to warmupClycles foreach { i ⇒ processor ! s"msg${i}" } + processor ! StartMeasure + 1 to loadCycles foreach { i ⇒ processor ! s"msg${i}" } + processor ! StopMeasure + expectMsgPF(100 seconds) { + case throughput: Double ⇒ println(f"\nthroughput = $throughput%.2f persistent events per second") + } + } + + def stressStashingEventsourcedProcessor(): Unit = { + val processor = namedProcessor[StashingEventsourcedTestProcessor] + 1 to warmupClycles foreach { i ⇒ processor ! "b" } + processor ! StartMeasure + val cmds = 1 to (loadCycles / 3) flatMap (_ ⇒ List("a", "b", "c")) + processor ! StartMeasure + cmds foreach (processor ! _) + processor ! StopMeasure + expectMsgPF(100 seconds) { + case throughput: Double ⇒ println(f"\nthroughput = $throughput%.2f persistent events per second") + } + } + + "A command sourced processor" should { + "have some reasonable throughput" in { + stressCommandsourcedProcessor(None) + } + "have some reasonable throughput under failure conditions" in { + stressCommandsourcedProcessor(Some(warmupClycles + loadCycles / 10)) + } + } + + "An event sourced processor" should { + "have some reasonable throughput" in { + stressEventsourcedProcessor(None) + } + "have some reasonable throughput under failure conditions" in { + stressEventsourcedProcessor(Some(warmupClycles + loadCycles / 10)) + } + "have some reasonable throughput with stashing and unstashing every 3rd command" in { + stressStashingEventsourcedProcessor() + } + } +} diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 5617e1a0db..8bb3ee3eff 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -8,6 +8,7 @@ import java.io.File import java.util.concurrent.atomic.AtomicInteger import scala.reflect.ClassTag +import scala.util.control.NoStackTrace import com.typesafe.config.ConfigFactory @@ -53,12 +54,12 @@ trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒ object PersistenceSpec { def config(plugin: String, test: String) = ConfigFactory.parseString( s""" - |serialize-creators = on - |serialize-messages = on - |akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}" - |akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec" - |akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}-spec/" - """.stripMargin) + serialize-creators = on + serialize-messages = on + akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}" + akka.persistence.journal.leveldb.dir = "target/journal-${test}-spec" + akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}-spec/" + """) } abstract class NamedProcessor(name: String) extends Processor { @@ -69,4 +70,6 @@ trait TurnOffRecoverOnStart { this: Processor ⇒ override def preStart(): Unit = () } +class TestException(msg: String) extends Exception(msg) with NoStackTrace + case object GetState diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala index c9eb14fc7a..0b8dc56f4a 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala @@ -4,6 +4,8 @@ package akka.persistence +import scala.collection.immutable.Seq + import com.typesafe.config._ import akka.actor._ @@ -13,8 +15,8 @@ object ProcessorSpec { class RecoverTestProcessor(name: String) extends NamedProcessor(name) { var state = List.empty[String] def receive = { - case "boom" ⇒ throw new Exception("boom") - case Persistent("boom", _) ⇒ throw new Exception("boom") + case "boom" ⇒ throw new TestException("boom") + case Persistent("boom", _) ⇒ throw new TestException("boom") case Persistent(payload, snr) ⇒ state = s"${payload}-${snr}" :: state case GetState ⇒ sender ! state.reverse } @@ -83,7 +85,7 @@ object ProcessorSpec { } } - class ResumeTestException extends Exception("test") + class ResumeTestException extends TestException("test") class ResumeTestSupervisor(name: String) extends Actor { val processor = context.actorOf(Props(classOf[ResumeTestProcessor], name)) @@ -119,7 +121,7 @@ object ProcessorSpec { class AnyReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) { val failOnReplayedA: Actor.Receive = { - case Persistent("a", _) if recoveryRunning ⇒ throw new Exception("boom") + case Persistent("a", _) if recoveryRunning ⇒ throw new TestException("boom") } override def receive = failOnReplayedA orElse super.receive @@ -283,6 +285,13 @@ abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with Persi processor ! GetState expectMsg(List("b-1", "c-3", "d-4", "e-5", "f-6", "g-7", "h-8", "i-9", "j-10")) } + "support batch writes" in { + val processor = namedProcessor[RecoverTestProcessor] + processor ! PersistentBatch(Seq(Persistent("c"), Persistent("d"), Persistent("e"))) + processor ! Persistent("f") + processor ! GetState + expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5", "f-6")) + } } "A processor" can { diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala index 023bd3402c..08cf455c49 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala @@ -18,8 +18,8 @@ object ProcessorStashSpec { case Persistent("b", snr) ⇒ update("b", snr) case Persistent("c", snr) ⇒ update("c", snr); unstashAll() case "x" ⇒ update("x") - case "boom" ⇒ throw new Exception("boom") - case Persistent("boom", _) ⇒ throw new Exception("boom") + case "boom" ⇒ throw new TestException("boom") + case Persistent("boom", _) ⇒ throw new TestException("boom") case GetState ⇒ sender ! state.reverse } diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 17a2d2633d..402770906b 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -1,5 +1,7 @@ package akka.persistence.serialization +import scala.collection.immutable + import com.typesafe.config._ import akka.actor._ @@ -118,8 +120,9 @@ object MessageSerializerRemotingSpec { class RemoteActor extends Actor { def receive = { - case Persistent(MyPayload(data), _) ⇒ sender ! data - case Confirm(pid, snr, cid) ⇒ sender ! s"${pid},${snr},${cid}" + case PersistentBatch(Persistent(MyPayload(data), _) +: tail) ⇒ sender ! data + case Persistent(MyPayload(data), _) ⇒ sender ! data + case Confirm(pid, snr, cid) ⇒ sender ! s"${pid},${snr},${cid}" } } @@ -147,10 +150,13 @@ class MessageSerializerRemotingSpec extends AkkaSpec(config(systemA).withFallbac localActor ! Persistent(MyPayload("a")) expectMsg(".a.") } + "custom-serialize persistent message batches during remoting" in { + localActor ! PersistentBatch(immutable.Seq(Persistent(MyPayload("a")))) + expectMsg(".a.") + } "serialize confirmation messages during remoting" in { localActor ! Confirm("a", 2, "b") expectMsg("a,2,b") - } } }