!per #3681 Performance and consistency improvements

- batch-write of persistent messages (user API)
- batch-write of events (in EventsourcedProcessor)
- command processing in EventsourcedProcessor by unstashing messages one-by-one from the internal stash
   * fixes performance issues that come up with unstashAll
- commands are not looped through journal actor but processed directly
- initial performance tests
  * command sourcing
  * event sourcing
  * event sourcing with user stash operations
- suppress stack traces in tests
This commit is contained in:
Martin Krasser 2013-10-27 08:01:14 +01:00
parent 8eeaadfee0
commit 1da3369643
29 changed files with 1324 additions and 76 deletions

View file

@ -9,6 +9,7 @@ import scala.language.existentials
import com.google.protobuf._
import akka.actor.ExtendedActorSystem
import akka.japi.Util.immutableSeq
import akka.persistence._
import akka.persistence.serialization.MessageFormats._
import akka.serialization._
@ -19,6 +20,7 @@ import akka.serialization._
class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
import PersistentImpl.Undefined
val PersistentBatchClass = classOf[PersistentBatch]
val PersistentClass = classOf[PersistentImpl]
val ConfirmClass = classOf[Confirm]
@ -26,25 +28,27 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
def includeManifest: Boolean = true
/**
* Serializes a [[Persistent]] message. Delegates serialization of the persistent message's
* payload to a matching `akka.serialization.Serializer`.
* Serializes [[PersistentBatch]] and [[Persistent]]. Delegates serialization of a
* persistent message's payload to a matching `akka.serialization.Serializer`.
*/
def toBinary(o: AnyRef): Array[Byte] = o match {
case p: PersistentImpl persistentMessageBuilder(p).build().toByteArray
case c: Confirm confirmMessageBuilder(c).build().toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
case b: PersistentBatch persistentMessageBatchBuilder(b).build().toByteArray
case p: PersistentImpl persistentMessageBuilder(p).build().toByteArray
case c: Confirm confirmMessageBuilder(c).build().toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
/**
* Deserializes a [[Persistent]] message. Delegates deserialization of the persistent message's
* payload to a matching `akka.serialization.Serializer`.
* Deserializes [[PersistentBatch]] and [[Persistent]]. Delegates deserialization of a
* persistent message's payload to a matching `akka.serialization.Serializer`.
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
case None persistent(PersistentMessage.parseFrom(bytes))
case Some(c) c match {
case PersistentClass persistent(PersistentMessage.parseFrom(bytes))
case ConfirmClass confirm(ConfirmMessage.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
case PersistentBatchClass persistentBatch(PersistentMessageBatch.parseFrom(bytes))
case PersistentClass persistent(PersistentMessage.parseFrom(bytes))
case ConfirmClass confirm(ConfirmMessage.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
}
}
@ -52,6 +56,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
// toBinary helpers
//
private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = {
val builder = PersistentMessageBatch.newBuilder
persistentBatch.persistentImplList.foreach(p builder.addBatch(persistentMessageBuilder(p)))
builder
}
private def persistentMessageBuilder(persistent: PersistentImpl) = {
val builder = PersistentMessage.newBuilder
@ -92,8 +102,10 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
// fromBinary helpers
//
private def persistentBatch(persistentMessageBatch: PersistentMessageBatch): PersistentBatch =
PersistentBatch(immutableSeq(persistentMessageBatch.getBatchList).map(persistent))
private def persistent(persistentMessage: PersistentMessage): PersistentImpl = {
import scala.collection.JavaConverters._
PersistentImpl(
payload(persistentMessage.getPayload),
persistentMessage.getSequenceNr,
@ -101,7 +113,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
if (persistentMessage.hasChannelId) persistentMessage.getChannelId else Undefined,
persistentMessage.getDeleted,
persistentMessage.getResolved,
persistentMessage.getConfirmsList.asScala.toList,
immutableSeq(persistentMessage.getConfirmsList),
if (persistentMessage.hasConfirmMessage) confirm(persistentMessage.getConfirmMessage) else null,
if (persistentMessage.hasConfirmTarget) system.provider.resolveActorRef(persistentMessage.getConfirmTarget) else null,
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else null)