pekko/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala

199 lines
8.3 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.serialization
import akka.actor.{ ActorPath, ExtendedActorSystem }
import akka.persistence.AtLeastOnceDelivery._
import akka.persistence._
import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent
import akka.persistence.serialization.{ MessageFormats mf }
import akka.serialization._
import com.google.protobuf._
import scala.collection.immutable.VectorBuilder
import scala.concurrent.duration
import akka.actor.Actor
import scala.concurrent.duration.Duration
import scala.language.existentials
/**
* Marker trait for all protobuf-serializable messages in `akka.persistence`.
*/
trait Message extends Serializable
/**
2014-11-09 14:12:36 +02:00
* Protobuf serializer for [[akka.persistence.PersistentRepr]], [[akka.persistence.AtLeastOnceDelivery]] and [[akka.persistence.fsm.PersistentFsmActor.StateChangeEvent]] messages.
*/
class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
import PersistentRepr.Undefined
val AtomicWriteClass = classOf[AtomicWrite]
val PersistentReprClass = classOf[PersistentRepr]
val PersistentImplClass = classOf[PersistentImpl]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnapshot]
2014-11-09 14:12:36 +02:00
val PersistentStateChangeEventClass = classOf[StateChangeEvent]
private lazy val serialization = SerializationExtension(system)
override val includeManifest: Boolean = true
private lazy val transportInformation: Option[Serialization.Information] = {
val address = system.provider.getDefaultAddress
if (address.hasLocalScope) None
else Some(Serialization.Information(address, system))
}
/**
* Serializes persistent messages. Delegates serialization of a persistent
* message's payload to a matching `akka.serialization.Serializer`.
*/
def toBinary(o: AnyRef): Array[Byte] = o match {
case p: PersistentRepr persistentMessageBuilder(p).build().toByteArray
case a: AtomicWrite atomicWriteBuilder(a).build().toByteArray
case a: AtLeastOnceDeliverySnapshot atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray
case s: StateChangeEvent stateChangeBuilder(s).build.toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
/**
* Deserializes persistent messages. Delegates deserialization of a persistent
* message's payload to a matching `akka.serialization.Serializer`.
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): Message = manifest match {
case None persistent(mf.PersistentMessage.parseFrom(bytes))
case Some(c) c match {
case PersistentImplClass persistent(mf.PersistentMessage.parseFrom(bytes))
case PersistentReprClass persistent(mf.PersistentMessage.parseFrom(bytes))
case AtomicWriteClass atomicWrite(mf.AtomicWrite.parseFrom(bytes))
case AtLeastOnceDeliverySnapshotClass atLeastOnceDeliverySnapshot(mf.AtLeastOnceDeliverySnapshot.parseFrom(bytes))
case PersistentStateChangeEventClass stateChange(mf.PersistentStateChangeEvent.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
}
}
//
// toBinary helpers
//
def atLeastOnceDeliverySnapshotBuilder(snap: AtLeastOnceDeliverySnapshot): mf.AtLeastOnceDeliverySnapshot.Builder = {
val builder = mf.AtLeastOnceDeliverySnapshot.newBuilder
builder.setCurrentDeliveryId(snap.currentDeliveryId)
snap.unconfirmedDeliveries.foreach { unconfirmed
val unconfirmedBuilder =
mf.AtLeastOnceDeliverySnapshot.UnconfirmedDelivery.newBuilder.
setDeliveryId(unconfirmed.deliveryId).
setDestination(unconfirmed.destination.toString).
setPayload(persistentPayloadBuilder(unconfirmed.message.asInstanceOf[AnyRef]))
builder.addUnconfirmedDeliveries(unconfirmedBuilder)
}
builder
}
private[persistence] def stateChangeBuilder(stateChange: StateChangeEvent): mf.PersistentStateChangeEvent.Builder = {
val builder = mf.PersistentStateChangeEvent.newBuilder.setStateIdentifier(stateChange.stateIdentifier)
2014-11-09 14:12:36 +02:00
stateChange.timeout match {
case None builder
case Some(timeout) builder.setTimeout(timeout.toString())
}
}
def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: mf.AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnapshot = {
import scala.collection.JavaConverters._
val unconfirmedDeliveries = new VectorBuilder[UnconfirmedDelivery]()
atLeastOnceDeliverySnapshot.getUnconfirmedDeliveriesList().iterator().asScala foreach { next
unconfirmedDeliveries += UnconfirmedDelivery(next.getDeliveryId, ActorPath.fromString(next.getDestination),
payload(next.getPayload))
}
AtLeastOnceDeliverySnapshot(
atLeastOnceDeliverySnapshot.getCurrentDeliveryId,
unconfirmedDeliveries.result())
}
def stateChange(persistentStateChange: mf.PersistentStateChangeEvent): StateChangeEvent = {
2014-11-09 14:12:36 +02:00
StateChangeEvent(
persistentStateChange.getStateIdentifier,
if (persistentStateChange.hasTimeout) Some(Duration(persistentStateChange.getTimeout).asInstanceOf[duration.FiniteDuration]) else None)
}
private def atomicWriteBuilder(a: AtomicWrite) = {
val builder = mf.AtomicWrite.newBuilder
a.payload.foreach { p
builder.addPayload(persistentMessageBuilder(p))
}
builder
}
private def persistentMessageBuilder(persistent: PersistentRepr) = {
val builder = mf.PersistentMessage.newBuilder
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
if (persistent.persistenceId != Undefined) builder.setPersistenceId(persistent.persistenceId)
if (persistent.sender != Actor.noSender) builder.setSender(Serialization.serializedActorPath(persistent.sender))
builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef]))
builder.setSequenceNr(persistent.sequenceNr)
// deleted is not used in new records from 2.4
if (persistent.writerUuid != Undefined) builder.setWriterUuid(persistent.writerUuid)
builder
}
private def persistentPayloadBuilder(payload: AnyRef) = {
def payloadBuilder() = {
val serializer = serialization.findSerializerFor(payload)
val builder = mf.PersistentPayload.newBuilder()
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(payload)
if (manifest != "")
builder.setPayloadManifest(ByteString.copyFromUtf8(manifest))
case _
if (serializer.includeManifest)
builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName))
}
builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload)))
builder.setSerializerId(serializer.identifier)
builder
}
// serialize actor references with full address information (defaultAddress)
transportInformation match {
case Some(ti) Serialization.currentTransportInformation.withValue(ti) { payloadBuilder() }
case None payloadBuilder()
}
}
//
// fromBinary helpers
//
private def persistent(persistentMessage: mf.PersistentMessage): PersistentRepr = {
PersistentRepr(
payload(persistentMessage.getPayload),
persistentMessage.getSequenceNr,
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
if (persistentMessage.hasPersistenceId) persistentMessage.getPersistenceId else Undefined,
if (persistentMessage.hasManifest) persistentMessage.getManifest else Undefined,
if (persistentMessage.hasDeleted) persistentMessage.getDeleted else false,
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else Actor.noSender,
if (persistentMessage.hasWriterUuid) persistentMessage.getWriterUuid else Undefined)
}
private def atomicWrite(atomicWrite: mf.AtomicWrite): AtomicWrite = {
import scala.collection.JavaConverters._
AtomicWrite(atomicWrite.getPayloadList.asScala.map(persistent)(collection.breakOut))
}
private def payload(persistentPayload: mf.PersistentPayload): Any = {
val manifest = if (persistentPayload.hasPayloadManifest)
persistentPayload.getPayloadManifest.toStringUtf8 else ""
serialization.deserialize(
persistentPayload.getPayload.toByteArray,
persistentPayload.getSerializerId,
manifest).get
}
}