pekko/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala

153 lines
6.3 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.serialization
import scala.language.existentials
import com.google.protobuf._
import akka.actor.{ ActorPath, ExtendedActorSystem }
import akka.japi.Util.immutableSeq
import akka.persistence._
import akka.persistence.serialization.MessageFormats._
import akka.serialization._
import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot AtLeastOnceDeliverySnap }
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
import scala.collection.immutable.VectorBuilder
/**
* Marker trait for all protobuf-serializable messages in `akka.persistence`.
*/
trait Message extends Serializable
/**
* Protobuf serializer for [[PersistentRepr]] and [[AtLeastOnceDelivery]] messages.
*/
class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
import PersistentRepr.Undefined
val PersistentReprClass = classOf[PersistentRepr]
val PersistentImplClass = classOf[PersistentImpl]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap]
val SerializationIdentifiers = "akka.actor.serialization-identifiers" // TODO move to [[Serializer]]
override val identifier: Int = system.settings.config.getInt(s"""${SerializationIdentifiers}."${getClass.getName}"""")
override val includeManifest: Boolean = true
private lazy val transportInformation: Option[Serialization.Information] = {
val address = system.provider.getDefaultAddress
if (address.hasLocalScope) None
else Some(Serialization.Information(address, system))
}
/**
* Serializes persistent messages. Delegates serialization of a persistent
* message's payload to a matching `akka.serialization.Serializer`.
*/
def toBinary(o: AnyRef): Array[Byte] = o match {
case p: PersistentRepr persistentMessageBuilder(p).build().toByteArray
case a: AtLeastOnceDeliverySnap atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
/**
* Deserializes persistent messages. Delegates deserialization of a persistent
* message's payload to a matching `akka.serialization.Serializer`.
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): Message = manifest match {
case None persistent(PersistentMessage.parseFrom(bytes))
case Some(c) c match {
case PersistentImplClass persistent(PersistentMessage.parseFrom(bytes))
case PersistentReprClass persistent(PersistentMessage.parseFrom(bytes))
case AtLeastOnceDeliverySnapshotClass atLeastOnceDeliverySnapshot(AtLeastOnceDeliverySnapshot.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
}
}
//
// toBinary helpers
//
def atLeastOnceDeliverySnapshotBuilder(snap: AtLeastOnceDeliverySnap): AtLeastOnceDeliverySnapshot.Builder = {
val builder = AtLeastOnceDeliverySnapshot.newBuilder
builder.setCurrentDeliveryId(snap.currentDeliveryId)
snap.unconfirmedDeliveries.foreach { unconfirmed
val unconfirmedBuilder =
AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.newBuilder.
setDeliveryId(unconfirmed.deliveryId).
setDestination(unconfirmed.destination.toString).
setPayload(persistentPayloadBuilder(unconfirmed.message.asInstanceOf[AnyRef]))
builder.addUnconfirmedDeliveries(unconfirmedBuilder)
}
builder
}
def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnap = {
import scala.collection.JavaConverters._
val unconfirmedDeliveries = new VectorBuilder[UnconfirmedDelivery]()
atLeastOnceDeliverySnapshot.getUnconfirmedDeliveriesList().iterator().asScala foreach { next
unconfirmedDeliveries += UnconfirmedDelivery(next.getDeliveryId, ActorPath.fromString(next.getDestination),
payload(next.getPayload))
}
AtLeastOnceDeliverySnap(
atLeastOnceDeliverySnapshot.getCurrentDeliveryId,
unconfirmedDeliveries.result())
}
private def persistentMessageBuilder(persistent: PersistentRepr) = {
val builder = PersistentMessage.newBuilder
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
if (persistent.persistenceId != Undefined) builder.setPersistenceId(persistent.persistenceId)
if (persistent.sender != null) builder.setSender(Serialization.serializedActorPath(persistent.sender))
builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef]))
builder.setSequenceNr(persistent.sequenceNr)
builder.setDeleted(persistent.deleted)
builder
}
private def persistentPayloadBuilder(payload: AnyRef) = {
def payloadBuilder() = {
val serializer = SerializationExtension(system).findSerializerFor(payload)
val builder = PersistentPayload.newBuilder()
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
if (serializer.includeManifest) builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName))
builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload)))
builder.setSerializerId(serializer.identifier)
builder
}
// serialize actor references with full address information (defaultAddress)
transportInformation match {
case Some(ti) Serialization.currentTransportInformation.withValue(ti) { payloadBuilder() }
case None payloadBuilder()
}
}
//
// fromBinary helpers
//
private def persistent(persistentMessage: PersistentMessage): PersistentRepr = {
PersistentRepr(
payload(persistentMessage.getPayload),
persistentMessage.getSequenceNr,
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined,
persistentMessage.getDeleted,
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else null)
}
private def payload(persistentPayload: PersistentPayload): Any = {
val payloadClass = if (persistentPayload.hasPayloadManifest)
Some(system.dynamicAccess.getClassFor[AnyRef](persistentPayload.getPayloadManifest.toStringUtf8).get) else None
SerializationExtension(system).deserialize(
persistentPayload.getPayload.toByteArray,
persistentPayload.getSerializerId,
payloadClass).get
}
}