pekko/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala

139 lines
5.5 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.serialization
import scala.language.existentials
import com.google.protobuf._
import akka.actor.ExtendedActorSystem
import akka.japi.Util.immutableSeq
import akka.persistence._
import akka.persistence.serialization.MessageFormats._
import akka.serialization._
/**
* Protobuf serializer for [[Persistent]] and `Confirm` messages.
*/
class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
import PersistentImpl.Undefined
val PersistentBatchClass = classOf[PersistentBatch]
val PersistentClass = classOf[PersistentImpl]
val ConfirmClass = classOf[Confirm]
def identifier: Int = 7
def includeManifest: Boolean = true
/**
* Serializes [[PersistentBatch]] and [[Persistent]]. Delegates serialization of a
* persistent message's payload to a matching `akka.serialization.Serializer`.
*/
def toBinary(o: AnyRef): Array[Byte] = o match {
case b: PersistentBatch persistentMessageBatchBuilder(b).build().toByteArray
case p: PersistentImpl persistentMessageBuilder(p).build().toByteArray
case c: Confirm confirmMessageBuilder(c).build().toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
/**
* Deserializes [[PersistentBatch]] and [[Persistent]]. Delegates deserialization of a
* persistent message's payload to a matching `akka.serialization.Serializer`.
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
case None persistent(PersistentMessage.parseFrom(bytes))
case Some(c) c match {
case PersistentBatchClass persistentBatch(PersistentMessageBatch.parseFrom(bytes))
case PersistentClass persistent(PersistentMessage.parseFrom(bytes))
case ConfirmClass confirm(ConfirmMessage.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
}
}
//
// toBinary helpers
//
private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = {
val builder = PersistentMessageBatch.newBuilder
persistentBatch.persistentImplList.foreach(p builder.addBatch(persistentMessageBuilder(p)))
builder
}
private def persistentMessageBuilder(persistent: PersistentImpl) = {
val builder = PersistentMessage.newBuilder
if (persistent.processorId != Undefined) builder.setProcessorId(persistent.processorId)
if (persistent.channelId != Undefined) builder.setChannelId(persistent.channelId)
if (persistent.confirmMessage != null) builder.setConfirmMessage(confirmMessageBuilder(persistent.confirmMessage))
if (persistent.confirmTarget != null) builder.setConfirmTarget(Serialization.serializedActorPath(persistent.confirmTarget))
if (persistent.sender != null) builder.setSender(Serialization.serializedActorPath(persistent.sender))
persistent.confirms.foreach(builder.addConfirms)
builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef]))
builder.setSequenceNr(persistent.sequenceNr)
builder.setDeleted(persistent.deleted)
builder.setResolved(persistent.resolved)
builder
}
private def persistentPayloadBuilder(payload: AnyRef) = {
val serializer = SerializationExtension(system).findSerializerFor(payload)
val builder = PersistentPayload.newBuilder()
if (serializer.includeManifest) builder.setPayloadManifest((ByteString.copyFromUtf8(payload.getClass.getName)))
builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload)))
builder.setSerializerId(serializer.identifier)
builder
}
private def confirmMessageBuilder(confirm: Confirm) = {
ConfirmMessage.newBuilder
.setProcessorId(confirm.processorId)
.setSequenceNr(confirm.sequenceNr)
.setChannelId(confirm.channelId)
}
//
// fromBinary helpers
//
private def persistentBatch(persistentMessageBatch: PersistentMessageBatch): PersistentBatch =
PersistentBatch(immutableSeq(persistentMessageBatch.getBatchList).map(persistent))
private def persistent(persistentMessage: PersistentMessage): PersistentImpl = {
PersistentImpl(
payload(persistentMessage.getPayload),
persistentMessage.getSequenceNr,
if (persistentMessage.hasProcessorId) persistentMessage.getProcessorId else Undefined,
if (persistentMessage.hasChannelId) persistentMessage.getChannelId else Undefined,
persistentMessage.getDeleted,
persistentMessage.getResolved,
immutableSeq(persistentMessage.getConfirmsList),
if (persistentMessage.hasConfirmMessage) confirm(persistentMessage.getConfirmMessage) else null,
if (persistentMessage.hasConfirmTarget) system.provider.resolveActorRef(persistentMessage.getConfirmTarget) else null,
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else null)
}
private def payload(persistentPayload: PersistentPayload): Any = {
val payloadClass = if (persistentPayload.hasPayloadManifest)
Some(system.dynamicAccess.getClassFor[AnyRef](persistentPayload.getPayloadManifest.toStringUtf8).get) else None
SerializationExtension(system).deserialize(
persistentPayload.getPayload.toByteArray,
persistentPayload.getSerializerId,
payloadClass).get
}
private def confirm(confirmMessage: ConfirmMessage): Confirm = {
Confirm(
confirmMessage.getProcessorId,
confirmMessage.getSequenceNr,
confirmMessage.getChannelId)
}
}