!per #15377 Mandate atomic writes for persistAll, and support rejections

* changing Plugin API for asyncWriteMessages and writeMessages
* passing explicit AtomicWrite that represents the events of
  persistAll, or a single event from persist
* journal may reject events before storing them, and that
  will result in onPersistRejected (logging) and continue in the
  persistent actor
* clarified the semantics with regards to batches and atomic writes,
  and failures and rejections in the api docs of asyncWriteMessages
  and writeMessages
* adjust the Java plugin API, asyncReplayMessages, doLoadAsync
This commit is contained in:
Patrik Nordwall 2015-06-23 21:01:36 +02:00
parent 33ee447ec9
commit 8c47e01e9d
38 changed files with 1500 additions and 216 deletions

View file

@ -5,13 +5,12 @@
package akka.persistence.serialization
import akka.actor.{ ActorPath, ExtendedActorSystem }
import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot AtLeastOnceDeliverySnap, UnconfirmedDelivery }
import akka.persistence.AtLeastOnceDelivery._
import akka.persistence._
import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent
import akka.persistence.serialization.MessageFormats._
import akka.persistence.serialization.{ MessageFormats mf }
import akka.serialization._
import com.google.protobuf._
import scala.collection.immutable.VectorBuilder
import scala.concurrent.duration
import akka.actor.Actor
@ -29,9 +28,10 @@ trait Message extends Serializable
class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
import PersistentRepr.Undefined
val AtomicWriteClass = classOf[AtomicWrite]
val PersistentReprClass = classOf[PersistentRepr]
val PersistentImplClass = classOf[PersistentImpl]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnapshot]
val PersistentStateChangeEventClass = classOf[StateChangeEvent]
private lazy val serialization = SerializationExtension(system)
@ -49,10 +49,11 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
* message's payload to a matching `akka.serialization.Serializer`.
*/
def toBinary(o: AnyRef): Array[Byte] = o match {
case p: PersistentRepr persistentMessageBuilder(p).build().toByteArray
case a: AtLeastOnceDeliverySnap atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray
case s: StateChangeEvent stateChangeBuilder(s).build.toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
case p: PersistentRepr persistentMessageBuilder(p).build().toByteArray
case a: AtomicWrite atomicWriteBuilder(a).build().toByteArray
case a: AtLeastOnceDeliverySnapshot atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray
case s: StateChangeEvent stateChangeBuilder(s).build.toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
/**
@ -60,12 +61,13 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
* message's payload to a matching `akka.serialization.Serializer`.
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): Message = manifest match {
case None persistent(PersistentMessage.parseFrom(bytes))
case None persistent(mf.PersistentMessage.parseFrom(bytes))
case Some(c) c match {
case PersistentImplClass persistent(PersistentMessage.parseFrom(bytes))
case PersistentReprClass persistent(PersistentMessage.parseFrom(bytes))
case AtLeastOnceDeliverySnapshotClass atLeastOnceDeliverySnapshot(AtLeastOnceDeliverySnapshot.parseFrom(bytes))
case PersistentStateChangeEventClass stateChange(PersistentStateChangeEvent.parseFrom(bytes))
case PersistentImplClass persistent(mf.PersistentMessage.parseFrom(bytes))
case PersistentReprClass persistent(mf.PersistentMessage.parseFrom(bytes))
case AtomicWriteClass atomicWrite(mf.AtomicWrite.parseFrom(bytes))
case AtLeastOnceDeliverySnapshotClass atLeastOnceDeliverySnapshot(mf.AtLeastOnceDeliverySnapshot.parseFrom(bytes))
case PersistentStateChangeEventClass stateChange(mf.PersistentStateChangeEvent.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
}
}
@ -74,12 +76,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
// toBinary helpers
//
def atLeastOnceDeliverySnapshotBuilder(snap: AtLeastOnceDeliverySnap): AtLeastOnceDeliverySnapshot.Builder = {
val builder = AtLeastOnceDeliverySnapshot.newBuilder
def atLeastOnceDeliverySnapshotBuilder(snap: AtLeastOnceDeliverySnapshot): mf.AtLeastOnceDeliverySnapshot.Builder = {
val builder = mf.AtLeastOnceDeliverySnapshot.newBuilder
builder.setCurrentDeliveryId(snap.currentDeliveryId)
snap.unconfirmedDeliveries.foreach { unconfirmed
val unconfirmedBuilder =
AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.newBuilder.
mf.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.newBuilder.
setDeliveryId(unconfirmed.deliveryId).
setDestination(unconfirmed.destination.toString).
setPayload(persistentPayloadBuilder(unconfirmed.message.asInstanceOf[AnyRef]))
@ -88,15 +90,15 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
builder
}
def stateChangeBuilder(stateChange: StateChangeEvent): PersistentStateChangeEvent.Builder = {
val builder = PersistentStateChangeEvent.newBuilder.setStateIdentifier(stateChange.stateIdentifier)
private[persistence] def stateChangeBuilder(stateChange: StateChangeEvent): mf.PersistentStateChangeEvent.Builder = {
val builder = mf.PersistentStateChangeEvent.newBuilder.setStateIdentifier(stateChange.stateIdentifier)
stateChange.timeout match {
case None builder
case Some(timeout) builder.setTimeout(timeout.toString())
}
}
def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnap = {
def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: mf.AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnapshot = {
import scala.collection.JavaConverters._
val unconfirmedDeliveries = new VectorBuilder[UnconfirmedDelivery]()
atLeastOnceDeliverySnapshot.getUnconfirmedDeliveriesList().iterator().asScala foreach { next
@ -104,19 +106,27 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
payload(next.getPayload))
}
AtLeastOnceDeliverySnap(
AtLeastOnceDeliverySnapshot(
atLeastOnceDeliverySnapshot.getCurrentDeliveryId,
unconfirmedDeliveries.result())
}
def stateChange(persistentStateChange: PersistentStateChangeEvent): StateChangeEvent = {
def stateChange(persistentStateChange: mf.PersistentStateChangeEvent): StateChangeEvent = {
StateChangeEvent(
persistentStateChange.getStateIdentifier,
if (persistentStateChange.hasTimeout) Some(Duration(persistentStateChange.getTimeout).asInstanceOf[duration.FiniteDuration]) else None)
}
private def atomicWriteBuilder(a: AtomicWrite) = {
val builder = mf.AtomicWrite.newBuilder
a.payload.foreach { p
builder.addPayload(persistentMessageBuilder(p))
}
builder
}
private def persistentMessageBuilder(persistent: PersistentRepr) = {
val builder = PersistentMessage.newBuilder
val builder = mf.PersistentMessage.newBuilder
if (persistent.persistenceId != Undefined) builder.setPersistenceId(persistent.persistenceId)
if (persistent.sender != Actor.noSender) builder.setSender(Serialization.serializedActorPath(persistent.sender))
@ -130,7 +140,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
private def persistentPayloadBuilder(payload: AnyRef) = {
def payloadBuilder() = {
val serializer = serialization.findSerializerFor(payload)
val builder = PersistentPayload.newBuilder()
val builder = mf.PersistentPayload.newBuilder()
serializer match {
case ser2: SerializerWithStringManifest
@ -158,7 +168,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
// fromBinary helpers
//
private def persistent(persistentMessage: PersistentMessage): PersistentRepr = {
private def persistent(persistentMessage: mf.PersistentMessage): PersistentRepr = {
PersistentRepr(
payload(persistentMessage.getPayload),
persistentMessage.getSequenceNr,
@ -168,7 +178,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender)
}
private def payload(persistentPayload: PersistentPayload): Any = {
private def atomicWrite(atomicWrite: mf.AtomicWrite): AtomicWrite = {
import scala.collection.JavaConverters._
AtomicWrite(atomicWrite.getPayloadList.asScala.map(persistent)(collection.breakOut))
}
private def payload(persistentPayload: mf.PersistentPayload): Any = {
val manifest = if (persistentPayload.hasPayloadManifest)
persistentPayload.getPayloadManifest.toStringUtf8 else ""