2013-10-09 13:11:53 +02:00
|
|
|
/**
|
2014-02-02 19:05:45 -06:00
|
|
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
2013-10-09 13:11:53 +02:00
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.persistence.serialization
|
|
|
|
|
|
|
|
|
|
import scala.language.existentials
|
|
|
|
|
|
|
|
|
|
import com.google.protobuf._
|
|
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
import akka.actor.{ ActorPath, ExtendedActorSystem }
|
2013-10-27 08:01:14 +01:00
|
|
|
import akka.japi.Util.immutableSeq
|
2013-10-09 13:11:53 +02:00
|
|
|
import akka.persistence._
|
|
|
|
|
import akka.persistence.serialization.MessageFormats._
|
|
|
|
|
import akka.serialization._
|
|
|
|
|
|
|
|
|
|
/**
|
2013-11-07 10:45:02 +01:00
|
|
|
* Marker trait for all protobuf-serializable messages in `akka.persistence`.
|
|
|
|
|
*/
|
|
|
|
|
trait Message extends Serializable
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Protobuf serializer for [[PersistentBatch]], [[PersistentRepr]] and [[Deliver]] messages.
|
2013-10-09 13:11:53 +02:00
|
|
|
*/
|
|
|
|
|
class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
|
2013-11-07 10:45:02 +01:00
|
|
|
import PersistentRepr.Undefined
|
2013-10-09 13:11:53 +02:00
|
|
|
|
2013-10-27 08:01:14 +01:00
|
|
|
val PersistentBatchClass = classOf[PersistentBatch]
|
2013-11-07 10:45:02 +01:00
|
|
|
val PersistentReprClass = classOf[PersistentRepr]
|
|
|
|
|
val PersistentImplClass = classOf[PersistentImpl]
|
|
|
|
|
val ConfirmablePersistentImplClass = classOf[ConfirmablePersistentImpl]
|
2014-01-17 06:58:25 +01:00
|
|
|
val DeliveredByTransientChannelClass = classOf[DeliveredByChannel]
|
2014-06-24 13:29:55 +02:00
|
|
|
val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistentChannel]
|
2013-11-07 10:45:02 +01:00
|
|
|
val DeliverClass = classOf[Deliver]
|
2013-10-09 13:11:53 +02:00
|
|
|
|
|
|
|
|
def identifier: Int = 7
|
|
|
|
|
def includeManifest: Boolean = true
|
|
|
|
|
|
2014-04-06 21:33:31 +02:00
|
|
|
private lazy val transportInformation: Option[Serialization.Information] = {
|
|
|
|
|
val address = system.provider.getDefaultAddress
|
|
|
|
|
if (address.hasLocalScope) None
|
|
|
|
|
else Some(Serialization.Information(address, system))
|
|
|
|
|
}
|
|
|
|
|
|
2013-10-09 13:11:53 +02:00
|
|
|
/**
|
2013-11-07 10:45:02 +01:00
|
|
|
* Serializes [[PersistentBatch]], [[PersistentRepr]] and [[Deliver]] messages. Delegates
|
|
|
|
|
* serialization of a persistent message's payload to a matching `akka.serialization.Serializer`.
|
2013-10-09 13:11:53 +02:00
|
|
|
*/
|
|
|
|
|
def toBinary(o: AnyRef): Array[Byte] = o match {
|
2014-06-24 13:29:55 +02:00
|
|
|
case b: PersistentBatch ⇒ persistentMessageBatchBuilder(b).build().toByteArray
|
|
|
|
|
case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray
|
|
|
|
|
case c: DeliveredByChannel ⇒ deliveredMessageBuilder(c).build().toByteArray
|
|
|
|
|
case c: DeliveredByPersistentChannel ⇒ deliveredMessageBuilder(c).build().toByteArray
|
|
|
|
|
case d: Deliver ⇒ deliverMessageBuilder(d).build.toByteArray
|
|
|
|
|
case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
|
2013-10-09 13:11:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2013-11-07 10:45:02 +01:00
|
|
|
* Deserializes [[PersistentBatch]], [[PersistentRepr]] and [[Deliver]] messages. Delegates
|
|
|
|
|
* deserialization of a persistent message's payload to a matching `akka.serialization.Serializer`.
|
2013-10-09 13:11:53 +02:00
|
|
|
*/
|
2013-11-07 10:45:02 +01:00
|
|
|
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): Message = manifest match {
|
2013-10-09 13:11:53 +02:00
|
|
|
case None ⇒ persistent(PersistentMessage.parseFrom(bytes))
|
|
|
|
|
case Some(c) ⇒ c match {
|
2014-01-17 06:58:25 +01:00
|
|
|
case PersistentImplClass ⇒ persistent(PersistentMessage.parseFrom(bytes))
|
|
|
|
|
case ConfirmablePersistentImplClass ⇒ persistent(PersistentMessage.parseFrom(bytes))
|
|
|
|
|
case PersistentReprClass ⇒ persistent(PersistentMessage.parseFrom(bytes))
|
|
|
|
|
case PersistentBatchClass ⇒ persistentBatch(PersistentMessageBatch.parseFrom(bytes))
|
|
|
|
|
case DeliveredByTransientChannelClass ⇒ delivered(DeliveredMessage.parseFrom(bytes))
|
|
|
|
|
case DeliveredByPersistentChannelClass ⇒ delivered(DeliveredMessage.parseFrom(bytes))
|
|
|
|
|
case DeliverClass ⇒ deliver(DeliverMessage.parseFrom(bytes))
|
|
|
|
|
case _ ⇒ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
|
2013-10-09 13:11:53 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// toBinary helpers
|
|
|
|
|
//
|
|
|
|
|
|
2013-11-07 10:45:02 +01:00
|
|
|
private def deliverMessageBuilder(deliver: Deliver) = {
|
|
|
|
|
val builder = DeliverMessage.newBuilder
|
|
|
|
|
builder.setPersistent(persistentMessageBuilder(deliver.persistent.asInstanceOf[PersistentRepr]))
|
2014-01-17 06:58:25 +01:00
|
|
|
builder.setDestination(deliver.destination.toString)
|
|
|
|
|
builder
|
2013-11-07 10:45:02 +01:00
|
|
|
}
|
|
|
|
|
|
2013-10-27 08:01:14 +01:00
|
|
|
private def persistentMessageBatchBuilder(persistentBatch: PersistentBatch) = {
|
|
|
|
|
val builder = PersistentMessageBatch.newBuilder
|
2014-06-03 16:40:44 +02:00
|
|
|
persistentBatch.batch.
|
|
|
|
|
filter(_.isInstanceOf[PersistentRepr]).
|
|
|
|
|
foreach(p ⇒ builder.addBatch(persistentMessageBuilder(p.asInstanceOf[PersistentRepr])))
|
2013-10-27 08:01:14 +01:00
|
|
|
builder
|
|
|
|
|
}
|
|
|
|
|
|
2013-11-07 10:45:02 +01:00
|
|
|
private def persistentMessageBuilder(persistent: PersistentRepr) = {
|
2013-10-09 13:11:53 +02:00
|
|
|
val builder = PersistentMessage.newBuilder
|
|
|
|
|
|
2014-06-23 14:33:35 +02:00
|
|
|
if (persistent.persistenceId != Undefined) builder.setPersistenceId(persistent.persistenceId)
|
2014-01-17 06:58:25 +01:00
|
|
|
if (persistent.confirmMessage != null) builder.setConfirmMessage(deliveredMessageBuilder(persistent.confirmMessage))
|
2013-10-09 13:11:53 +02:00
|
|
|
if (persistent.confirmTarget != null) builder.setConfirmTarget(Serialization.serializedActorPath(persistent.confirmTarget))
|
|
|
|
|
if (persistent.sender != null) builder.setSender(Serialization.serializedActorPath(persistent.sender))
|
|
|
|
|
|
|
|
|
|
persistent.confirms.foreach(builder.addConfirms)
|
|
|
|
|
|
|
|
|
|
builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef]))
|
|
|
|
|
builder.setSequenceNr(persistent.sequenceNr)
|
|
|
|
|
builder.setDeleted(persistent.deleted)
|
2013-12-06 12:48:44 +01:00
|
|
|
builder.setRedeliveries(persistent.redeliveries)
|
2013-11-07 10:45:02 +01:00
|
|
|
builder.setConfirmable(persistent.confirmable)
|
2013-10-09 13:11:53 +02:00
|
|
|
builder
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def persistentPayloadBuilder(payload: AnyRef) = {
|
2014-04-06 21:33:31 +02:00
|
|
|
def payloadBuilder() = {
|
|
|
|
|
val serializer = SerializationExtension(system).findSerializerFor(payload)
|
|
|
|
|
val builder = PersistentPayload.newBuilder()
|
2013-10-09 13:11:53 +02:00
|
|
|
|
2014-06-23 14:33:35 +02:00
|
|
|
if (serializer.includeManifest) builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName))
|
2013-10-09 13:11:53 +02:00
|
|
|
|
2014-04-06 21:33:31 +02:00
|
|
|
builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload)))
|
|
|
|
|
builder.setSerializerId(serializer.identifier)
|
|
|
|
|
builder
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// serialize actor references with full address information (defaultAddress)
|
|
|
|
|
transportInformation match {
|
|
|
|
|
case Some(ti) ⇒ Serialization.currentTransportInformation.withValue(ti) { payloadBuilder() }
|
|
|
|
|
case None ⇒ payloadBuilder()
|
|
|
|
|
}
|
2013-10-09 13:11:53 +02:00
|
|
|
}
|
|
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
private def deliveredMessageBuilder(delivered: Delivered) = {
|
|
|
|
|
val builder = DeliveredMessage.newBuilder
|
2013-12-06 12:48:44 +01:00
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
if (delivered.channel != null) builder.setChannel(Serialization.serializedActorPath(delivered.channel))
|
2013-12-06 12:48:44 +01:00
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
builder.setChannelId(delivered.channelId)
|
|
|
|
|
builder.setPersistentSequenceNr(delivered.persistentSequenceNr)
|
|
|
|
|
builder.setDeliverySequenceNr(delivered.deliverySequenceNr)
|
|
|
|
|
|
|
|
|
|
delivered match {
|
2014-06-23 14:33:35 +02:00
|
|
|
case c: DeliveredByChannel ⇒ builder.setPersistenceId(c.persistenceId)
|
2014-01-17 06:58:25 +01:00
|
|
|
case _ ⇒ builder
|
|
|
|
|
}
|
2013-10-09 13:11:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
// fromBinary helpers
|
|
|
|
|
//
|
|
|
|
|
|
2013-11-07 10:45:02 +01:00
|
|
|
private def deliver(deliverMessage: DeliverMessage): Deliver = {
|
|
|
|
|
Deliver(
|
|
|
|
|
persistent(deliverMessage.getPersistent),
|
2014-01-17 06:58:25 +01:00
|
|
|
ActorPath.fromString(deliverMessage.getDestination))
|
2013-11-07 10:45:02 +01:00
|
|
|
}
|
|
|
|
|
|
2013-10-27 08:01:14 +01:00
|
|
|
private def persistentBatch(persistentMessageBatch: PersistentMessageBatch): PersistentBatch =
|
|
|
|
|
PersistentBatch(immutableSeq(persistentMessageBatch.getBatchList).map(persistent))
|
|
|
|
|
|
2013-11-07 10:45:02 +01:00
|
|
|
private def persistent(persistentMessage: PersistentMessage): PersistentRepr = {
|
|
|
|
|
PersistentRepr(
|
2013-10-09 13:11:53 +02:00
|
|
|
payload(persistentMessage.getPayload),
|
|
|
|
|
persistentMessage.getSequenceNr,
|
2014-06-23 14:33:35 +02:00
|
|
|
if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined,
|
2013-10-09 13:11:53 +02:00
|
|
|
persistentMessage.getDeleted,
|
2013-12-06 12:48:44 +01:00
|
|
|
persistentMessage.getRedeliveries,
|
2013-10-27 08:01:14 +01:00
|
|
|
immutableSeq(persistentMessage.getConfirmsList),
|
2013-11-07 10:45:02 +01:00
|
|
|
persistentMessage.getConfirmable,
|
2014-01-17 06:58:25 +01:00
|
|
|
if (persistentMessage.hasConfirmMessage) delivered(persistentMessage.getConfirmMessage) else null,
|
2013-10-09 13:11:53 +02:00
|
|
|
if (persistentMessage.hasConfirmTarget) system.provider.resolveActorRef(persistentMessage.getConfirmTarget) else null,
|
|
|
|
|
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else null)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def payload(persistentPayload: PersistentPayload): Any = {
|
|
|
|
|
val payloadClass = if (persistentPayload.hasPayloadManifest)
|
|
|
|
|
Some(system.dynamicAccess.getClassFor[AnyRef](persistentPayload.getPayloadManifest.toStringUtf8).get) else None
|
|
|
|
|
|
|
|
|
|
SerializationExtension(system).deserialize(
|
|
|
|
|
persistentPayload.getPayload.toByteArray,
|
|
|
|
|
persistentPayload.getSerializerId,
|
|
|
|
|
payloadClass).get
|
|
|
|
|
}
|
|
|
|
|
|
2014-01-17 06:58:25 +01:00
|
|
|
private def delivered(deliveredMessage: DeliveredMessage): Delivered = {
|
|
|
|
|
val channel = if (deliveredMessage.hasChannel) system.provider.resolveActorRef(deliveredMessage.getChannel) else null
|
|
|
|
|
|
2014-06-23 14:33:35 +02:00
|
|
|
if (deliveredMessage.hasPersistenceId) {
|
2014-01-17 06:58:25 +01:00
|
|
|
DeliveredByChannel(
|
2014-06-23 14:33:35 +02:00
|
|
|
deliveredMessage.getPersistenceId,
|
2014-01-17 06:58:25 +01:00
|
|
|
deliveredMessage.getChannelId,
|
|
|
|
|
deliveredMessage.getPersistentSequenceNr,
|
|
|
|
|
deliveredMessage.getDeliverySequenceNr,
|
|
|
|
|
channel)
|
|
|
|
|
} else {
|
2014-06-24 13:29:55 +02:00
|
|
|
DeliveredByPersistentChannel(
|
2014-01-17 06:58:25 +01:00
|
|
|
deliveredMessage.getChannelId,
|
|
|
|
|
deliveredMessage.getPersistentSequenceNr,
|
|
|
|
|
deliveredMessage.getDeliverySequenceNr,
|
|
|
|
|
channel)
|
|
|
|
|
}
|
2013-10-09 13:11:53 +02:00
|
|
|
}
|
|
|
|
|
}
|