+act #17576 Support serializer with string manifest

* useful when evolution is needed, e.g. Akka Persistence

* docs, comments, cluster-metrics and cluster-tools serializers
This commit is contained in:
Patrik Nordwall 2015-05-28 18:42:22 +02:00
parent aeb2302c2f
commit 740f006a38
21 changed files with 605 additions and 93 deletions

View file

@ -10,6 +10,7 @@ import scala.collection.immutable
import akka.actor._
import akka.serialization.SerializationExtension
import akka.util.{ Unsafe, Helpers }
import akka.serialization.SerializerWithStringManifest
private[akka] trait Children { this: ActorCell
@ -190,7 +191,18 @@ private[akka] trait Children { this: ActorCell ⇒
props.args forall (arg
arg == null ||
arg.isInstanceOf[NoSerializationVerificationNeeded] ||
ser.deserialize(ser.serialize(arg.asInstanceOf[AnyRef]).get, arg.getClass).get != null)
{
val o = arg.asInstanceOf[AnyRef]
val serializer = ser.findSerializerFor(o)
val bytes = serializer.toBinary(o)
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(o)
ser.deserialize(bytes, serializer.identifier, manifest).get != null
case _
ser.deserialize(bytes, arg.getClass).get != null
}
})
} catch {
case NonFatal(e) throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e)
}

View file

@ -15,6 +15,7 @@ import scala.util.control.NonFatal
import scala.util.control.Exception.Catcher
import akka.dispatch.MailboxType
import akka.dispatch.ProducesMessageQueue
import akka.serialization.SerializerWithStringManifest
private[akka] trait Dispatch { this: ActorCell
@ -117,7 +118,15 @@ private[akka] trait Dispatch { this: ActorCell ⇒
}).asInstanceOf[AnyRef]
if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
val s = SerializationExtension(system)
s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
val serializer = s.findSerializerFor(unwrapped)
val bytes = serializer.toBinary(unwrapped)
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(unwrapped)
s.deserialize(bytes, serializer.identifier, manifest).get != null
case _
s.deserialize(bytes, unwrapped.getClass).get
}
}
}
dispatcher.dispatch(this, msg)

View file

@ -13,6 +13,7 @@ import java.io.NotSerializableException
import scala.util.{ Try, DynamicVariable, Failure }
import scala.collection.immutable
import scala.util.control.NonFatal
import scala.util.Success
object Serialization {
@ -91,7 +92,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
/**
* Deserializes the given array of bytes using the specified serializer id,
* using the optional type hint to the Serializer and the optional ClassLoader ot load it into.
* using the optional type hint to the Serializer.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize[T](bytes: Array[Byte], serializerId: Int, clazz: Option[Class[_ <: T]]): Try[T] =
@ -104,9 +105,37 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
serializer.fromBinary(bytes, clazz).asInstanceOf[T]
}
/**
* Deserializes the given array of bytes using the specified serializer id,
* using the optional type hint to the Serializer.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize(bytes: Array[Byte], serializerId: Int, manifest: String): Try[AnyRef] =
Try {
val serializer = try serializerByIdentity(serializerId) catch {
case _: NoSuchElementException throw new NotSerializableException(
s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " +
"akka.actor.serializers is not in synch between the two systems.")
}
serializer match {
case s2: SerializerWithStringManifest s2.fromBinary(bytes, manifest)
case s1
if (manifest == "")
s1.fromBinary(bytes, None)
else {
system.dynamicAccess.getClassFor[AnyRef](manifest) match {
case Success(classManifest)
s1.fromBinary(bytes, Some(classManifest))
case Failure(e)
throw new NotSerializableException(
s"Cannot find manifest class [$manifest] for serializer with id [$serializerId].")
}
}
}
}
/**
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
* You can specify an optional ClassLoader to load the object into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize[T](bytes: Array[Byte], clazz: Class[T]): Try[T] =
@ -118,10 +147,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
* Throws akka.ConfigurationException if no `serialization-bindings` is configured for the
* class of the object.
*/
def findSerializerFor(o: AnyRef): Serializer = o match {
case null NullSerializer
case other serializerFor(other.getClass)
}
def findSerializerFor(o: AnyRef): Serializer =
if (o eq null) NullSerializer else serializerFor(o.getClass)
/**
* Returns the configured Serializer for the given Class. The configured Serializer
@ -205,5 +232,6 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
*/
val serializerByIdentity: Map[Int, Serializer] =
Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) (v.identifier, v) }
}

View file

@ -26,7 +26,7 @@ import akka.serialization.JavaSerializer.CurrentSystem
* load classes using reflection.</li>
* </ul>
*
* <b>Be sure to always use the PropertyManager for loading classes!</b> This is necessary to
* <b>Be sure to always use the [[akka.actor.DynamicAccess]] for loading classes!</b> This is necessary to
* avoid strange match errors and inequalities which arise from different class loaders loading
* the same class.
*/
@ -65,6 +65,74 @@ trait Serializer {
final def fromBinary(bytes: Array[Byte], clazz: Class[_]): AnyRef = fromBinary(bytes, Option(clazz))
}
/**
* A Serializer represents a bimap between an object and an array of bytes representing that object.
*
* For serialization of data that need to evolve over time the `SerializerWithStringManifest` is recommended instead
* of [[Serializer]] because the manifest (type hint) is a `String` instead of a `Class`. That means
* that the class can be moved/removed and the serializer can still deserialize old data by matching
* on the `String`. This is especially useful for Akka Persistence.
*
* The manifest string can also encode a version number that can be used in [[#fromBinary]] to
* deserialize in different ways to migrate old data to new domain objects.
*
* If the data was originally serialized with [[Serializer]] and in a later version of the
* system you change to `SerializerWithStringManifest` the manifest string will be the full class name if
* you used `includeManifest=true`, otherwise it will be the empty string.
*
* Serializers are loaded using reflection during [[akka.actor.ActorSystem]]
* start-up, where two constructors are tried in order:
*
* <ul>
* <li>taking exactly one argument of type [[akka.actor.ExtendedActorSystem]];
* this should be the preferred one because all reflective loading of classes
* during deserialization should use ExtendedActorSystem.dynamicAccess (see
* [[akka.actor.DynamicAccess]]), and</li>
* <li>without arguments, which is only an option if the serializer does not
* load classes using reflection.</li>
* </ul>
*
* <b>Be sure to always use the [[akka.actor.DynamicAccess]] for loading classes!</b> This is necessary to
* avoid strange match errors and inequalities which arise from different class loaders loading
* the same class.
*/
abstract class SerializerWithStringManifest extends Serializer {
/**
* Completely unique value to identify this implementation of Serializer, used to optimize network traffic.
* Values from 0 to 16 are reserved for Akka internal usage.
*/
def identifier: Int
final override def includeManifest: Boolean = true
/**
* Return the manifest (type hint) that will be provided in the fromBinary method.
* Use `""` if manifest is not needed.
*/
def manifest(o: AnyRef): String
/**
* Serializes the given object into an Array of Byte
*/
def toBinary(o: AnyRef): Array[Byte]
/**
* Produces an object from an array of bytes, with an optional type-hint;
* the class should be loaded using ActorSystem.dynamicAccess.
*/
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef
final def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
val manifestString = manifest match {
case Some(c) c.getName
case None ""
}
fromBinary(bytes, manifestString)
}
}
/**
* Base serializer trait with serialization identifiers configuration contract,
* when globally unique serialization identifier is configured in the `reference.conf`.

View file

@ -7,7 +7,6 @@ package akka.util
import java.io.{ ObjectInputStream, ObjectOutputStream }
import java.nio.{ ByteBuffer, ByteOrder }
import java.lang.{ Iterable JIterable }
import scala.annotation.varargs
import scala.collection.IndexedSeqOptimized
import scala.collection.mutable.{ Builder, WrappedArray }
@ -15,6 +14,7 @@ import scala.collection.immutable
import scala.collection.immutable.{ IndexedSeq, VectorBuilder }
import scala.collection.generic.CanBuildFrom
import scala.reflect.ClassTag
import java.nio.charset.StandardCharsets
object ByteString {
@ -42,7 +42,7 @@ object ByteString {
/**
* Creates a new ByteString by encoding a String as UTF-8.
*/
def apply(string: String): ByteString = apply(string, "UTF-8")
def apply(string: String): ByteString = apply(string, UTF_8)
/**
* Creates a new ByteString by encoding a String with a charset.
@ -79,6 +79,11 @@ object ByteString {
*/
def fromString(string: String, charset: String): ByteString = apply(string, charset)
/**
* Standard "UTF-8" charset
*/
val UTF_8: String = StandardCharsets.UTF_8.name()
/**
* Creates a new ByteString by copying bytes out of a ByteBuffer.
*/
@ -484,7 +489,7 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz
/**
* Decodes this ByteString as a UTF-8 encoded String.
*/
final def utf8String: String = decodeString("UTF-8")
final def utf8String: String = decodeString(ByteString.UTF_8)
/**
* Decodes this ByteString using a charset to produce a String.
@ -539,7 +544,7 @@ object CompactByteString {
/**
* Creates a new CompactByteString by encoding a String as UTF-8.
*/
def apply(string: String): CompactByteString = apply(string, "UTF-8")
def apply(string: String): CompactByteString = apply(string, ByteString.UTF_8)
/**
* Creates a new CompactByteString by encoding a String with a charset.

View file

@ -7,34 +7,36 @@ package akka.cluster.metrics.protobuf
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream }
import java.util.zip.{ GZIPInputStream, GZIPOutputStream }
import java.{ lang jl }
import akka.actor.{ Address, ExtendedActorSystem }
import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages cm }
import akka.cluster.metrics.{ ClusterMetricsMessage, ClusterMetricsSettings, EWMA, Metric, MetricsGossip, MetricsGossipEnvelope, NodeMetrics }
import akka.serialization.BaseSerializer
import akka.util.ClassLoaderObjectInputStream
import com.google.protobuf.{ ByteString, MessageLite }
import scala.annotation.tailrec
import scala.collection.JavaConverters.{ asJavaIterableConverter, asScalaBufferConverter, setAsJavaSetConverter }
import akka.serialization.SerializerWithStringManifest
/**
* Protobuf serializer for [[akka.cluster.metrics.ClusterMetricsMessage]] types.
*/
class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {
private final val BufferSize = 4 * 1024
private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: ClusterMetricsMessage], Array[Byte] AnyRef](
classOf[MetricsGossipEnvelope] -> metricsGossipEnvelopeFromBinary)
private val MetricsGossipEnvelopeManifest = "a"
override val includeManifest: Boolean = true
override def manifest(obj: AnyRef): String = obj match {
case _: MetricsGossipEnvelope MetricsGossipEnvelopeManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: MetricsGossipEnvelope
compress(metricsGossipEnvelopeToProto(m))
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
def compress(msg: MessageLite): Array[Byte] = {
@ -61,12 +63,10 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
out.toByteArray
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match {
case Some(c) fromBinaryMap.get(c.asInstanceOf[Class[ClusterMetricsMessage]]) match {
case Some(f) f(bytes)
case None throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in metrics")
}
case _ throw new IllegalArgumentException("Need a metrics message class to be able to deserialize bytes in metrics")
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case MetricsGossipEnvelopeManifest metricsGossipEnvelopeFromBinary(bytes)
case _ throw new IllegalArgumentException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}")
}
private def addressToProto(address: Address): cm.Address.Builder = address match {

View file

@ -24,7 +24,7 @@ class MessageSerializerSpec extends AkkaSpec(
def checkSerialization(obj: AnyRef): Unit = {
val blob = serializer.toBinary(obj)
val ref = serializer.fromBinary(blob, obj.getClass)
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
obj match {
case _
ref should ===(obj)

View file

@ -27,40 +27,57 @@ import akka.serialization.Serialization
import akka.actor.ActorRef
import akka.serialization.SerializationExtension
import scala.collection.immutable.TreeMap
import akka.serialization.SerializerWithStringManifest
/**
* Protobuf serializer of DistributedPubSubMediator messages.
*/
class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem)
extends SerializerWithStringManifest with BaseSerializer {
private lazy val serialization = SerializationExtension(system)
private final val BufferSize = 1024 * 4
private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: DistributedPubSubMessage], Array[Byte] AnyRef](
classOf[Status] -> statusFromBinary,
classOf[Delta] -> deltaFromBinary,
classOf[Send] -> sendFromBinary,
classOf[SendToAll] -> sendToAllFromBinary,
classOf[Publish] -> publishFromBinary)
private val StatusManifest = "A"
private val DeltaManifest = "B"
private val SendManifest = "C"
private val SendToAllManifest = "D"
private val PublishManifest = "E"
def includeManifest: Boolean = true
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] AnyRef](
StatusManifest -> statusFromBinary,
DeltaManifest -> deltaFromBinary,
SendManifest -> sendFromBinary,
SendToAllManifest -> sendToAllFromBinary,
PublishManifest -> publishFromBinary)
def toBinary(obj: AnyRef): Array[Byte] = obj match {
override def manifest(obj: AnyRef): String = obj match {
case _: Status StatusManifest
case _: Delta DeltaManifest
case _: Send SendManifest
case _: SendToAll SendToAllManifest
case _: Publish PublishManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: Status compress(statusToProto(m))
case m: Delta compress(deltaToProto(m))
case m: Send sendToProto(m).toByteArray
case m: SendToAll sendToAllToProto(m).toByteArray
case m: Publish publishToProto(m).toByteArray
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match {
case Some(c) fromBinaryMap.get(c.asInstanceOf[Class[DistributedPubSubMessage]]) match {
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
fromBinaryMap.get(manifest) match {
case Some(f) f(bytes)
case None throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in DistributedPubSubMessageSerializer")
case None throw new IllegalArgumentException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
}
case _ throw new IllegalArgumentException("Need a message class to be able to deserialize bytes in DistributedPubSubMessageSerializer")
}
def compress(msg: MessageLite): Array[Byte] = {
val bos = new ByteArrayOutputStream(BufferSize)
@ -189,21 +206,30 @@ class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extend
private def payloadToProto(msg: Any): dm.Payload = {
val m = msg.asInstanceOf[AnyRef]
val msgSerializer = SerializationExtension(system).findSerializerFor(m)
val msgSerializer = serialization.findSerializerFor(m)
val builder = dm.Payload.newBuilder().
setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(m)))
.setSerializerId(msgSerializer.identifier)
if (msgSerializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(m.getClass.getName))
msgSerializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(m)
if (manifest != "")
builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
case _
if (msgSerializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(m.getClass.getName))
}
builder.build()
}
private def payloadFromProto(payload: dm.Payload): AnyRef = {
SerializationExtension(system).deserialize(
val manifest = if (payload.hasMessageManifest) payload.getMessageManifest.toStringUtf8 else ""
serialization.deserialize(
payload.getEnclosedMessage.toByteArray,
payload.getSerializerId,
if (payload.hasMessageManifest)
Some(system.dynamicAccess.getClassFor[AnyRef](payload.getMessageManifest.toStringUtf8).get) else None).get
manifest).get
}
}

View file

@ -17,7 +17,7 @@ class DistributedPubSubMessageSerializerSpec extends AkkaSpec {
def checkSerialization(obj: AnyRef): Unit = {
val blob = serializer.toBinary(obj)
val ref = serializer.fromBinary(blob, obj.getClass)
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
ref should ===(obj)
}

View file

@ -3,9 +3,14 @@
*/
package docs.serialization;
import java.io.UnsupportedEncodingException;
import akka.testkit.JavaTestKit;
import org.junit.Test;
import static org.junit.Assert.*;
import java.nio.charset.StandardCharsets;
//#imports
import akka.actor.*;
import akka.serialization.*;
@ -49,6 +54,79 @@ public class SerializationDocTest {
}
//#my-own-serializer
static class Customer {
public final String name;
Customer(String name) {
this.name = name;
}
}
static class User {
public final String name;
User(String name) {
this.name = name;
}
}
static
//#my-own-serializer2
public class MyOwnSerializer2 extends SerializerWithStringManifest {
private static final String CUSTOMER_MANIFEST = "customer";
private static final String USER_MANIFEST = "user";
private static final String UTF_8 = StandardCharsets.UTF_8.name();
// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
// 0 - 16 is reserved by Akka itself
@Override public int identifier() {
return 1234567;
}
@Override public String manifest(Object obj) {
if (obj instanceof Customer)
return CUSTOMER_MANIFEST;
else if (obj instanceof User)
return USER_MANIFEST;
else
throw new IllegalArgumentException("Unknow type: " + obj);
}
// "toBinary" serializes the given object to an Array of Bytes
@Override public byte[] toBinary(Object obj) {
// Put the real code that serializes the object here
try {
if (obj instanceof Customer)
return ((Customer) obj).name.getBytes(UTF_8);
else if (obj instanceof User)
return ((User) obj).name.getBytes(UTF_8);
else
throw new IllegalArgumentException("Unknow type: " + obj);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
// "fromBinary" deserializes the given array,
// using the type hint
@Override public Object fromBinary(byte[] bytes, String manifest) {
// Put the real code that deserializes here
try {
if (manifest.equals(CUSTOMER_MANIFEST))
return new Customer(new String(bytes, UTF_8));
else if (manifest.equals(USER_MANIFEST))
return new User(new String(bytes, UTF_8));
else
throw new IllegalArgumentException("Unknow manifest: " + manifest);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
//#my-own-serializer2
@Test public void serializeActorRefs() {
final ExtendedActorSystem extendedSystem = (ExtendedActorSystem)
ActorSystem.create("whatever");

View file

@ -103,9 +103,38 @@ which is done by extending ``akka.serialization.JSerializer``, like this:
:include: my-own-serializer
:exclude: ...
The manifest is a type hint so that the same serializer can be used for different
classes. The manifest parameter in ``fromBinaryJava`` is the class of the object that
was serialized. In ``fromBinary`` you can match on the class and deserialize the
bytes to different objects.
Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then
list which classes that should be serialized using it.
Serializer with String Manifest
-------------------------------
The ``Serializer`` illustrated above supports a class based manifest (type hint).
For serialization of data that need to evolve over time the `SerializerWithStringManifest`
is recommended instead of ``Serializer`` because the manifest (type hint) is a ``String``
instead of a ``Class``. That means that the class can be moved/removed and the serializer
can still deserialize old data by matching on the ``String``. This is especially useful
for :ref:`persistence-java`.
The manifest string can also encode a version number that can be used in ``fromBinary`` to
deserialize in different ways to migrate old data to new domain objects.
If the data was originally serialized with ``Serializer`` and in a later version of the
system you change to ``SerializerWithStringManifest`` the manifest string will be the full
class name if you used ``includeManifest=true``, otherwise it will be the empty string.
This is how a ``SerializerWithStringManifest`` looks like:
.. includecode:: code/docs/serialization/SerializationDocTest.java#my-own-serializer2
You must also bind it to a name in your :ref:`configuration` and then list which classes
that should be serialized using it.
Serializing ActorRefs
---------------------

View file

@ -15,12 +15,13 @@ package docs.serialization {
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.Address
import java.nio.charset.StandardCharsets
//#my-own-serializer
class MyOwnSerializer extends Serializer {
// This is whether "fromBinary" requires a "clazz" or not
def includeManifest: Boolean = false
def includeManifest: Boolean = true
// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
@ -37,7 +38,6 @@ package docs.serialization {
// "fromBinary" deserializes the given array,
// using the type hint (if any, see "includeManifest" above)
// into the optionally provided classLoader.
def fromBinary(bytes: Array[Byte],
clazz: Option[Class[_]]): AnyRef = {
// Put your code that deserializes here
@ -48,8 +48,52 @@ package docs.serialization {
}
//#my-own-serializer
//#my-own-serializer2
class MyOwnSerializer2 extends SerializerWithStringManifest {
val CustomerManifest = "customer"
val UserManifest = "user"
val UTF_8 = StandardCharsets.UTF_8.name()
// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
// 0 - 16 is reserved by Akka itself
def identifier = 1234567
// The manifest (type hint) that will be provided in the fromBinary method
// Use `""` if manifest is not needed.
def manifest(obj: AnyRef): String =
obj match {
case _: Customer => CustomerManifest
case _: User => UserManifest
}
// "toBinary" serializes the given object to an Array of Bytes
def toBinary(obj: AnyRef): Array[Byte] = {
// Put the real code that serializes the object here
obj match {
case Customer(name) => name.getBytes(UTF_8)
case User(name) => name.getBytes(UTF_8)
}
}
// "fromBinary" deserializes the given array,
// using the type hint
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
// Put the real code that deserializes here
manifest match {
case CustomerManifest =>
Customer(new String(bytes, UTF_8))
case UserManifest =>
User(new String(bytes, UTF_8))
}
}
}
//#my-own-serializer2
trait MyOwnSerializable
final case class Customer(name: String) extends MyOwnSerializable
final case class User(name: String) extends MyOwnSerializable
class SerializationDocSpec extends AkkaSpec {
"demonstrate configuration of serialize messages" in {

View file

@ -95,9 +95,38 @@ First you need to create a class definition of your ``Serializer`` like so:
:include: imports,my-own-serializer
:exclude: ...
The manifest is a type hint so that the same serializer can be used for different
classes. The manifest parameter in ``fromBinary`` is the class of the object that
was serialized. In ``fromBinary`` you can match on the class and deserialize the
bytes to different objects.
Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then
list which classes that should be serialized using it.
Serializer with String Manifest
-------------------------------
The ``Serializer`` illustrated above supports a class based manifest (type hint).
For serialization of data that need to evolve over time the `SerializerWithStringManifest`
is recommended instead of ``Serializer`` because the manifest (type hint) is a ``String``
instead of a ``Class``. That means that the class can be moved/removed and the serializer
can still deserialize old data by matching on the ``String``. This is especially useful
for :ref:`persistence-scala`.
The manifest string can also encode a version number that can be used in ``fromBinary`` to
deserialize in different ways to migrate old data to new domain objects.
If the data was originally serialized with ``Serializer`` and in a later version of the
system you change to ``SerializerWithStringManifest`` the manifest string will be the full
class name if you used ``includeManifest=true``, otherwise it will be the empty string.
This is how a ``SerializerWithStringManifest`` looks like:
.. includecode:: code/docs/serialization/SerializationDocSpec.scala#my-own-serializer2
You must also bind it to a name in your :ref:`configuration` and then list which classes
that should be serialized using it.
Serializing ActorRefs
---------------------

View file

@ -7,6 +7,7 @@ package akka.persistence.journal.leveldb
import org.iq80.leveldb.DBIterator
import akka.actor.Actor
import akka.util.ByteString.UTF_8
/**
* INTERNAL API.
@ -38,7 +39,7 @@ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore
val nextEntry = iter.next()
val nextKey = keyFromBytes(nextEntry.getKey)
if (!isMappingKey(nextKey)) pathMap else {
val nextVal = new String(nextEntry.getValue, "UTF-8")
val nextVal = new String(nextEntry.getValue, UTF_8)
readIdMap(pathMap + (nextVal -> nextKey.mappingId), iter)
}
}
@ -46,7 +47,7 @@ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore
private def writeIdMapping(id: String, numericId: Int): Int = {
idMap = idMap + (id -> numericId)
leveldb.put(keyToBytes(mappingKey(numericId)), id.getBytes("UTF-8"))
leveldb.put(keyToBytes(mappingKey(numericId)), id.getBytes(UTF_8))
numericId
}

View file

@ -30,6 +30,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val PersistentImplClass = classOf[PersistentImpl]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap]
private lazy val serialization = SerializationExtension(system)
override val includeManifest: Boolean = true
private lazy val transportInformation: Option[Serialization.Information] = {
@ -107,10 +109,18 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
private def persistentPayloadBuilder(payload: AnyRef) = {
def payloadBuilder() = {
val serializer = SerializationExtension(system).findSerializerFor(payload)
val serializer = serialization.findSerializerFor(payload)
val builder = PersistentPayload.newBuilder()
if (serializer.includeManifest) builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName))
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(payload)
if (manifest != "")
builder.setPayloadManifest(ByteString.copyFromUtf8(manifest))
case _
if (serializer.includeManifest)
builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName))
}
builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload)))
builder.setSerializerId(serializer.identifier)
@ -138,13 +148,13 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
}
private def payload(persistentPayload: PersistentPayload): Any = {
val payloadClass = if (persistentPayload.hasPayloadManifest)
Some(system.dynamicAccess.getClassFor[AnyRef](persistentPayload.getPayloadManifest.toStringUtf8).get) else None
val manifest = if (persistentPayload.hasPayloadManifest)
persistentPayload.getPayloadManifest.toStringUtf8 else ""
SerializationExtension(system).deserialize(
serialization.deserialize(
persistentPayload.getPayload.toByteArray,
persistentPayload.getSerializerId,
payloadClass).get
manifest).get
}
}

View file

@ -8,6 +8,7 @@ package akka.persistence.serialization
import java.io._
import akka.actor._
import akka.serialization._
import akka.util.ByteString.UTF_8
import scala.util.Success
import scala.util.Failure
@ -33,6 +34,8 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
override val includeManifest: Boolean = false
private lazy val serialization = SerializationExtension(system)
private lazy val transportInformation: Option[Serialization.Information] = {
val address = system.provider.getDefaultAddress
if (address.hasLocalScope) None
@ -57,14 +60,21 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = {
def serialize() = {
val extension = SerializationExtension(system)
val snapshotSerializer = extension.findSerializerFor(snapshot)
val snapshotSerializer = serialization.findSerializerFor(snapshot)
val headerOut = new ByteArrayOutputStream
writeInt(headerOut, snapshotSerializer.identifier)
if (snapshotSerializer.includeManifest)
headerOut.write(snapshot.getClass.getName.getBytes("utf-8"))
snapshotSerializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(snapshot)
if (manifest != "")
headerOut.write(manifest.getBytes(UTF_8))
case _
if (snapshotSerializer.includeManifest)
headerOut.write(snapshot.getClass.getName.getBytes(UTF_8))
}
val headerBytes = headerOut.toByteArray
val out = new ByteArrayOutputStream
@ -84,8 +94,6 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
}
private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = {
val extension = SerializationExtension(system)
val in = new ByteArrayInputStream(bytes)
val headerLength = readInt(in)
val headerBytes = bytes.slice(4, headerLength + 4)
@ -122,7 +130,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val oldHeader =
if (readShort(in) == 0xedac) { // Java Serialization magic value with swapped bytes
val b = if (SnapshotSerializer.doPatch) patch(headerBytes) else headerBytes
extension.deserialize(b, classOf[SnapshotHeader]).toOption
serialization.deserialize(b, classOf[SnapshotHeader]).toOption
} else None
val header = oldHeader.getOrElse {
@ -134,13 +142,12 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
else {
val manifestBytes = Array.ofDim[Byte](remaining)
headerIn.read(manifestBytes)
Some(new String(manifestBytes, "utf-8"))
Some(new String(manifestBytes, UTF_8))
}
SnapshotHeader(serializerId, manifest)
}
val manifest = header.manifest.map(system.dynamicAccess.getClassFor[AnyRef](_).get)
extension.deserialize[AnyRef](snapshotBytes, header.serializerId, manifest).get
serialization.deserialize(snapshotBytes, header.serializerId, header.manifest.getOrElse("")).get
}
private def writeInt(outputStream: OutputStream, i: Int) =
@ -205,4 +212,4 @@ object SnapshotSerializer {
} else false
} else false
}
}
}

View file

@ -17,6 +17,7 @@ import akka.persistence._
import akka.persistence.snapshot._
import akka.persistence.serialization._
import akka.serialization.SerializationExtension
import akka.util.ByteString.UTF_8
/**
* INTERNAL API.
@ -102,13 +103,13 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
try { p(stream) } finally { stream.close() }
private def snapshotFile(metadata: SnapshotMetadata, extension: String = ""): File =
new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}${extension}")
new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, UTF_8)}-${metadata.sequenceNr}-${metadata.timestamp}${extension}")
private def snapshotMetadata(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = {
val files = snapshotDir.listFiles(new SnapshotFilenameFilter(persistenceId))
if (files eq null) Nil // if the dir was removed
else files.map(_.getName).collect {
case FilenamePattern(pid, snr, tms) SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong)
case FilenamePattern(pid, snr, tms) SnapshotMetadata(URLDecoder.decode(pid, UTF_8), snr.toLong, tms.toLong)
}.filter(md criteria.matches(md) && !saving.contains(md)).toVector
}

View file

@ -10,24 +10,34 @@ import akka.actor._
import akka.persistence._
import akka.serialization._
import akka.testkit._
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
import akka.util.ByteString.UTF_8
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import org.apache.commons.codec.binary.Hex.decodeHex
import SerializerSpecConfigs._
object SerializerSpecConfigs {
val customSerializers = ConfigFactory.parseString(
"""
akka.actor {
serializers {
my-payload = "akka.persistence.serialization.MyPayloadSerializer"
my-payload2 = "akka.persistence.serialization.MyPayload2Serializer"
my-snapshot = "akka.persistence.serialization.MySnapshotSerializer"
my-snapshot2 = "akka.persistence.serialization.MySnapshotSerializer2"
old-payload = "akka.persistence.serialization.OldPayloadSerializer"
}
serialization-bindings {
"akka.persistence.serialization.MyPayload" = my-payload
"akka.persistence.serialization.MyPayload2" = my-payload2
"akka.persistence.serialization.MySnapshot" = my-snapshot
"akka.persistence.serialization.MySnapshot2" = my-snapshot2
# this entry was used when creating the data for the test
# "deserialize data when class is removed"
#"akka.persistence.serialization.OldPayload" = old-payload
}
}
""")
@ -53,6 +63,7 @@ object SerializerSpecConfigs {
def config(configs: String*): Config =
configs.foldLeft(ConfigFactory.empty)((r, c) r.withFallback(ConfigFactory.parseString(c)))
}
import SerializerSpecConfigs._
@ -71,9 +82,19 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
deserialized should ===(Snapshot(MySnapshot(".a.")))
}
"handle custom snapshot Serialization with string manifest" in {
val wrapped = Snapshot(MySnapshot2("a"))
val serializer = serialization.findSerializerFor(wrapped)
val bytes = serializer.toBinary(wrapped)
val deserialized = serializer.fromBinary(bytes, None)
deserialized should ===(Snapshot(MySnapshot2(".a.")))
}
"be able to read snapshot created with akka 2.3.6 and Scala 2.10" in {
val dataStr = "abc"
val snapshot = Snapshot(dataStr.getBytes("utf-8"))
val snapshot = Snapshot(dataStr.getBytes(UTF_8))
val serializer = serialization.findSerializerFor(snapshot)
// the oldSnapshot was created with Akka 2.3.6 and it is using JavaSerialization
@ -91,13 +112,13 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
val bytes = decodeHex(oldSnapshot.toCharArray)
val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot]
val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], "utf-8")
val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], UTF_8)
dataStr should ===(deserializedDataStr)
}
"be able to read snapshot created with akka 2.3.6 and Scala 2.11" in {
val dataStr = "abc"
val snapshot = Snapshot(dataStr.getBytes("utf-8"))
val snapshot = Snapshot(dataStr.getBytes(UTF_8))
val serializer = serialization.findSerializerFor(snapshot)
// the oldSnapshot was created with Akka 2.3.6 and it is using JavaSerialization
@ -115,7 +136,7 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
val bytes = decodeHex(oldSnapshot.toCharArray)
val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot]
val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], "utf-8")
val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], UTF_8)
dataStr should ===(deserializedDataStr)
}
}
@ -136,6 +157,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
deserialized should ===(persistent.withPayload(MyPayload(".a.")))
}
}
"given a PersistentRepr manifest" must {
"handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, testActor)
@ -148,6 +170,57 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
}
}
"given payload serializer with string manifest" must {
"handle serialization" in {
val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", true, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
val deserialized = serializer.fromBinary(bytes, None)
deserialized should ===(persistent.withPayload(MyPayload2(".a.", 17)))
}
"be able to evolve the data types" in {
val oldEvent = MyPayload("a")
val serializer1 = serialization.findSerializerFor(oldEvent)
val bytes = serializer1.toBinary(oldEvent)
// now the system is updated to version 2 with new class MyPayload2
// and MyPayload2Serializer that handles migration from old MyPayload
val serializer2 = serialization.serializerFor(classOf[MyPayload2])
val deserialized = serializer2.fromBinary(bytes, Some(oldEvent.getClass))
deserialized should be(MyPayload2(".a.", 0))
}
"be able to deserialize data when class is removed" in {
val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", true, testActor))
// It was created with:
// val old = PersistentRepr(OldPayload('A'), 13, "p1", true, testActor)
// import org.apache.commons.codec.binary.Hex._
// println(s"encoded OldPayload: " + String.valueOf(encodeHex(serializer.toBinary(old))))
//
val oldData =
"0a3e08c7da04120d4f6c645061796c6f61642841291a2" +
"9616b6b612e70657273697374656e63652e7365726961" +
"6c697a6174696f6e2e4f6c645061796c6f6164100d1a0" +
"2703120015a45616b6b613a2f2f4d6573736167655365" +
"7269616c697a657250657273697374656e63655370656" +
"32f73797374656d2f746573744163746f722d31233133" +
"3137373931343033"
// now the system is updated, OldPayload is replaced by MyPayload, and the
// OldPayloadSerializer is adjusted to migrate OldPayload
val bytes = decodeHex(oldData.toCharArray)
val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[PersistentRepr]
deserialized.payload should be(MyPayload("OldPayload(A)"))
}
}
"given AtLeastOnceDeliverySnapshot" must {
"handle empty unconfirmed" in {
val unconfirmed = Vector.empty
@ -173,7 +246,6 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
deserialized should ===(snap)
}
}
}
@ -222,7 +294,13 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS
}
final case class MyPayload(data: String)
final case class MyPayload2(data: String, n: Int)
final case class MySnapshot(data: String)
final case class MySnapshot2(data: String)
// this class was used when creating the data for the test
// "deserialize data when class is removed"
//final case class OldPayload(c: Char)
class MyPayloadSerializer extends Serializer {
val MyPayloadClass = classOf[MyPayload]
@ -231,16 +309,41 @@ class MyPayloadSerializer extends Serializer {
def includeManifest: Boolean = true
def toBinary(o: AnyRef): Array[Byte] = o match {
case MyPayload(data) s".${data}".getBytes("UTF-8")
case MyPayload(data) s".${data}".getBytes(UTF_8)
}
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
case Some(MyPayloadClass) MyPayload(s"${new String(bytes, "UTF-8")}.")
case Some(MyPayloadClass) MyPayload(s"${new String(bytes, UTF_8)}.")
case Some(c) throw new Exception(s"unexpected manifest ${c}")
case None throw new Exception("no manifest")
}
}
class MyPayload2Serializer extends SerializerWithStringManifest {
val MyPayload2Class = classOf[MyPayload]
val ManifestV1 = classOf[MyPayload].getName
val ManifestV2 = "MyPayload-V2"
def identifier: Int = 77125
def manifest(o: AnyRef): String = ManifestV2
def toBinary(o: AnyRef): Array[Byte] = o match {
case MyPayload2(data, n) s".$data:$n".getBytes(UTF_8)
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case ManifestV2
val parts = new String(bytes, UTF_8).split(":")
MyPayload2(data = parts(0) + ".", n = parts(1).toInt)
case ManifestV1
MyPayload2(data = s"${new String(bytes, UTF_8)}.", n = 0)
case other
throw new Exception(s"unexpected manifest [$other]")
}
}
class MySnapshotSerializer extends Serializer {
val MySnapshotClass = classOf[MySnapshot]
@ -248,12 +351,55 @@ class MySnapshotSerializer extends Serializer {
def includeManifest: Boolean = true
def toBinary(o: AnyRef): Array[Byte] = o match {
case MySnapshot(data) s".${data}".getBytes("UTF-8")
case MySnapshot(data) s".${data}".getBytes(UTF_8)
}
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
case Some(MySnapshotClass) MySnapshot(s"${new String(bytes, "UTF-8")}.")
case Some(MySnapshotClass) MySnapshot(s"${new String(bytes, UTF_8)}.")
case Some(c) throw new Exception(s"unexpected manifest ${c}")
case None throw new Exception("no manifest")
}
}
class MySnapshotSerializer2 extends SerializerWithStringManifest {
val CurrentManifest = "MySnapshot-V2"
val OldManifest = classOf[MySnapshot].getName
def identifier: Int = 77126
def manifest(o: AnyRef): String = CurrentManifest
def toBinary(o: AnyRef): Array[Byte] = o match {
case MySnapshot2(data) s".${data}".getBytes(UTF_8)
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case CurrentManifest | OldManifest
MySnapshot2(s"${new String(bytes, UTF_8)}.")
case other
throw new Exception(s"unexpected manifest [$other]")
}
}
class OldPayloadSerializer extends SerializerWithStringManifest {
def identifier: Int = 77127
val OldPayloadClassName = "akka.persistence.serialization.OldPayload"
val MyPayloadClassName = classOf[MyPayload].getName
def manifest(o: AnyRef): String = o.getClass.getName
def toBinary(o: AnyRef): Array[Byte] = o match {
case MyPayload(data) s".${data}".getBytes(UTF_8)
case old if old.getClass.getName == OldPayloadClassName
o.toString.getBytes(UTF_8)
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case OldPayloadClassName
MyPayload(new String(bytes, UTF_8))
case MyPayloadClassName MyPayload(s"${new String(bytes, UTF_8)}.")
case other
throw new Exception(s"unexpected manifest [$other]")
}
}

View file

@ -8,6 +8,7 @@ import akka.remote.WireFormats._
import com.google.protobuf.ByteString
import akka.actor.ExtendedActorSystem
import akka.serialization.SerializationExtension
import akka.serialization.SerializerWithStringManifest
/**
* INTERNAL API
@ -23,7 +24,7 @@ private[akka] object MessageSerializer {
SerializationExtension(system).deserialize(
messageProtocol.getMessage.toByteArray,
messageProtocol.getSerializerId,
if (messageProtocol.hasMessageManifest) Some(system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8).get) else None).get
if (messageProtocol.hasMessageManifest) messageProtocol.getMessageManifest.toStringUtf8 else "").get
}
/**
@ -35,8 +36,15 @@ private[akka] object MessageSerializer {
val builder = SerializedMessage.newBuilder
builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
builder.setSerializerId(serializer.identifier)
if (serializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(message)
if (manifest != "")
builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
case _
if (serializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
}
builder.build
}
}

View file

@ -22,12 +22,13 @@ import scala.util.{ Failure, Success }
import akka.remote.transport.AkkaPduCodec.Message
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.util.ByteString.UTF_8
/**
* INTERNAL API
*/
private[remote] object AddressUrlEncoder {
def apply(address: Address): String = URLEncoder.encode(address.toString, "utf-8")
def apply(address: Address): String = URLEncoder.encode(address.toString, UTF_8)
}
/**

View file

@ -14,12 +14,15 @@ import akka.actor.SelectionPathElement
import akka.remote.ContainerFormats
import akka.serialization.SerializationExtension
import akka.serialization.BaseSerializer
import akka.serialization.SerializerWithStringManifest
class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
@deprecated("Use constructor with ExtendedActorSystem", "2.4")
def this() = this(null)
private lazy val serialization = SerializationExtension(system)
// TODO remove this when deprecated this() is removed
override val identifier: Int =
if (system eq null) 6
@ -37,14 +40,21 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSe
private def serializeSelection(sel: ActorSelectionMessage): Array[Byte] = {
val builder = ContainerFormats.SelectionEnvelope.newBuilder()
val message = sel.msg.asInstanceOf[AnyRef]
val serializer = SerializationExtension(system).findSerializerFor(message)
val serializer = serialization.findSerializerFor(message)
builder.
setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(message))).
setSerializerId(serializer.identifier).
setWildcardFanOut(sel.wildcardFanOut)
if (serializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
serializer match {
case ser2: SerializerWithStringManifest
val manifest = ser2.manifest(message)
if (manifest != "")
builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
case _
if (serializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
}
sel.elements.foreach {
case SelectChildName(name)
@ -66,11 +76,11 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSe
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
val selectionEnvelope = ContainerFormats.SelectionEnvelope.parseFrom(bytes)
val msg = SerializationExtension(system).deserialize(
val manifest = if (selectionEnvelope.hasMessageManifest) selectionEnvelope.getMessageManifest.toStringUtf8 else ""
val msg = serialization.deserialize(
selectionEnvelope.getEnclosedMessage.toByteArray,
selectionEnvelope.getSerializerId,
if (selectionEnvelope.hasMessageManifest)
Some(system.dynamicAccess.getClassFor[AnyRef](selectionEnvelope.getMessageManifest.toStringUtf8).get) else None).get
manifest).get
import scala.collection.JavaConverters._
val elements: immutable.Iterable[SelectionPathElement] = selectionEnvelope.getPatternList.asScala.map { x