diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala
index 16b7953e21..f14313de84 100644
--- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala
+++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala
@@ -10,6 +10,7 @@ import scala.collection.immutable
import akka.actor._
import akka.serialization.SerializationExtension
import akka.util.{ Unsafe, Helpers }
+import akka.serialization.SerializerWithStringManifest
private[akka] trait Children { this: ActorCell ⇒
@@ -190,7 +191,18 @@ private[akka] trait Children { this: ActorCell ⇒
props.args forall (arg ⇒
arg == null ||
arg.isInstanceOf[NoSerializationVerificationNeeded] ||
- ser.deserialize(ser.serialize(arg.asInstanceOf[AnyRef]).get, arg.getClass).get != null)
+ {
+ val o = arg.asInstanceOf[AnyRef]
+ val serializer = ser.findSerializerFor(o)
+ val bytes = serializer.toBinary(o)
+ serializer match {
+ case ser2: SerializerWithStringManifest ⇒
+ val manifest = ser2.manifest(o)
+ ser.deserialize(bytes, serializer.identifier, manifest).get != null
+ case _ ⇒
+ ser.deserialize(bytes, arg.getClass).get != null
+ }
+ })
} catch {
case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e)
}
diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala
index b45ef9d59b..f404b43306 100644
--- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala
+++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala
@@ -15,6 +15,7 @@ import scala.util.control.NonFatal
import scala.util.control.Exception.Catcher
import akka.dispatch.MailboxType
import akka.dispatch.ProducesMessageQueue
+import akka.serialization.SerializerWithStringManifest
private[akka] trait Dispatch { this: ActorCell ⇒
@@ -117,7 +118,15 @@ private[akka] trait Dispatch { this: ActorCell ⇒
}).asInstanceOf[AnyRef]
if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
val s = SerializationExtension(system)
- s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
+ val serializer = s.findSerializerFor(unwrapped)
+ val bytes = serializer.toBinary(unwrapped)
+ serializer match {
+ case ser2: SerializerWithStringManifest ⇒
+ val manifest = ser2.manifest(unwrapped)
+ s.deserialize(bytes, serializer.identifier, manifest).get != null
+ case _ ⇒
+ s.deserialize(bytes, unwrapped.getClass).get
+ }
}
}
dispatcher.dispatch(this, msg)
diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala
index 17e6625e80..09f29b2b50 100644
--- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala
+++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala
@@ -13,6 +13,7 @@ import java.io.NotSerializableException
import scala.util.{ Try, DynamicVariable, Failure }
import scala.collection.immutable
import scala.util.control.NonFatal
+import scala.util.Success
object Serialization {
@@ -91,7 +92,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
/**
* Deserializes the given array of bytes using the specified serializer id,
- * using the optional type hint to the Serializer and the optional ClassLoader ot load it into.
+ * using the optional type hint to the Serializer.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize[T](bytes: Array[Byte], serializerId: Int, clazz: Option[Class[_ <: T]]): Try[T] =
@@ -104,9 +105,37 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
serializer.fromBinary(bytes, clazz).asInstanceOf[T]
}
+ /**
+ * Deserializes the given array of bytes using the specified serializer id,
+ * using the optional type hint to the Serializer.
+ * Returns either the resulting object or an Exception if one was thrown.
+ */
+ def deserialize(bytes: Array[Byte], serializerId: Int, manifest: String): Try[AnyRef] =
+ Try {
+ val serializer = try serializerByIdentity(serializerId) catch {
+ case _: NoSuchElementException ⇒ throw new NotSerializableException(
+ s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " +
+ "akka.actor.serializers is not in synch between the two systems.")
+ }
+ serializer match {
+ case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest)
+ case s1 ⇒
+ if (manifest == "")
+ s1.fromBinary(bytes, None)
+ else {
+ system.dynamicAccess.getClassFor[AnyRef](manifest) match {
+ case Success(classManifest) ⇒
+ s1.fromBinary(bytes, Some(classManifest))
+ case Failure(e) ⇒
+ throw new NotSerializableException(
+ s"Cannot find manifest class [$manifest] for serializer with id [$serializerId].")
+ }
+ }
+ }
+ }
+
/**
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
- * You can specify an optional ClassLoader to load the object into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize[T](bytes: Array[Byte], clazz: Class[T]): Try[T] =
@@ -118,10 +147,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
* Throws akka.ConfigurationException if no `serialization-bindings` is configured for the
* class of the object.
*/
- def findSerializerFor(o: AnyRef): Serializer = o match {
- case null ⇒ NullSerializer
- case other ⇒ serializerFor(other.getClass)
- }
+ def findSerializerFor(o: AnyRef): Serializer =
+ if (o eq null) NullSerializer else serializerFor(o.getClass)
/**
* Returns the configured Serializer for the given Class. The configured Serializer
@@ -205,5 +232,6 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
*/
val serializerByIdentity: Map[Int, Serializer] =
Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) ⇒ (v.identifier, v) }
+
}
diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala
index 429a465764..7cadd47acc 100644
--- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala
+++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala
@@ -26,7 +26,7 @@ import akka.serialization.JavaSerializer.CurrentSystem
* load classes using reflection.
*
*
- * Be sure to always use the PropertyManager for loading classes! This is necessary to
+ * Be sure to always use the [[akka.actor.DynamicAccess]] for loading classes! This is necessary to
* avoid strange match errors and inequalities which arise from different class loaders loading
* the same class.
*/
@@ -65,6 +65,74 @@ trait Serializer {
final def fromBinary(bytes: Array[Byte], clazz: Class[_]): AnyRef = fromBinary(bytes, Option(clazz))
}
+/**
+ * A Serializer represents a bimap between an object and an array of bytes representing that object.
+ *
+ * For serialization of data that need to evolve over time the `SerializerWithStringManifest` is recommended instead
+ * of [[Serializer]] because the manifest (type hint) is a `String` instead of a `Class`. That means
+ * that the class can be moved/removed and the serializer can still deserialize old data by matching
+ * on the `String`. This is especially useful for Akka Persistence.
+ *
+ * The manifest string can also encode a version number that can be used in [[#fromBinary]] to
+ * deserialize in different ways to migrate old data to new domain objects.
+ *
+ * If the data was originally serialized with [[Serializer]] and in a later version of the
+ * system you change to `SerializerWithStringManifest` the manifest string will be the full class name if
+ * you used `includeManifest=true`, otherwise it will be the empty string.
+ *
+ * Serializers are loaded using reflection during [[akka.actor.ActorSystem]]
+ * start-up, where two constructors are tried in order:
+ *
+ *
+ * - taking exactly one argument of type [[akka.actor.ExtendedActorSystem]];
+ * this should be the preferred one because all reflective loading of classes
+ * during deserialization should use ExtendedActorSystem.dynamicAccess (see
+ * [[akka.actor.DynamicAccess]]), and
+ * - without arguments, which is only an option if the serializer does not
+ * load classes using reflection.
+ *
+ *
+ * Be sure to always use the [[akka.actor.DynamicAccess]] for loading classes! This is necessary to
+ * avoid strange match errors and inequalities which arise from different class loaders loading
+ * the same class.
+ */
+abstract class SerializerWithStringManifest extends Serializer {
+
+ /**
+ * Completely unique value to identify this implementation of Serializer, used to optimize network traffic.
+ * Values from 0 to 16 are reserved for Akka internal usage.
+ */
+ def identifier: Int
+
+ final override def includeManifest: Boolean = true
+
+ /**
+ * Return the manifest (type hint) that will be provided in the fromBinary method.
+ * Use `""` if manifest is not needed.
+ */
+ def manifest(o: AnyRef): String
+
+ /**
+ * Serializes the given object into an Array of Byte
+ */
+ def toBinary(o: AnyRef): Array[Byte]
+
+ /**
+ * Produces an object from an array of bytes, with an optional type-hint;
+ * the class should be loaded using ActorSystem.dynamicAccess.
+ */
+ def fromBinary(bytes: Array[Byte], manifest: String): AnyRef
+
+ final def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
+ val manifestString = manifest match {
+ case Some(c) ⇒ c.getName
+ case None ⇒ ""
+ }
+ fromBinary(bytes, manifestString)
+ }
+
+}
+
/**
* Base serializer trait with serialization identifiers configuration contract,
* when globally unique serialization identifier is configured in the `reference.conf`.
diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala
index 2ae54536be..4300f46123 100644
--- a/akka-actor/src/main/scala/akka/util/ByteString.scala
+++ b/akka-actor/src/main/scala/akka/util/ByteString.scala
@@ -7,7 +7,6 @@ package akka.util
import java.io.{ ObjectInputStream, ObjectOutputStream }
import java.nio.{ ByteBuffer, ByteOrder }
import java.lang.{ Iterable ⇒ JIterable }
-
import scala.annotation.varargs
import scala.collection.IndexedSeqOptimized
import scala.collection.mutable.{ Builder, WrappedArray }
@@ -15,6 +14,7 @@ import scala.collection.immutable
import scala.collection.immutable.{ IndexedSeq, VectorBuilder }
import scala.collection.generic.CanBuildFrom
import scala.reflect.ClassTag
+import java.nio.charset.StandardCharsets
object ByteString {
@@ -42,7 +42,7 @@ object ByteString {
/**
* Creates a new ByteString by encoding a String as UTF-8.
*/
- def apply(string: String): ByteString = apply(string, "UTF-8")
+ def apply(string: String): ByteString = apply(string, UTF_8)
/**
* Creates a new ByteString by encoding a String with a charset.
@@ -79,6 +79,11 @@ object ByteString {
*/
def fromString(string: String, charset: String): ByteString = apply(string, charset)
+ /**
+ * Standard "UTF-8" charset
+ */
+ val UTF_8: String = StandardCharsets.UTF_8.name()
+
/**
* Creates a new ByteString by copying bytes out of a ByteBuffer.
*/
@@ -484,7 +489,7 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz
/**
* Decodes this ByteString as a UTF-8 encoded String.
*/
- final def utf8String: String = decodeString("UTF-8")
+ final def utf8String: String = decodeString(ByteString.UTF_8)
/**
* Decodes this ByteString using a charset to produce a String.
@@ -539,7 +544,7 @@ object CompactByteString {
/**
* Creates a new CompactByteString by encoding a String as UTF-8.
*/
- def apply(string: String): CompactByteString = apply(string, "UTF-8")
+ def apply(string: String): CompactByteString = apply(string, ByteString.UTF_8)
/**
* Creates a new CompactByteString by encoding a String with a charset.
diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala
index 9f74c76dba..6171a5f7dc 100644
--- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala
+++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala
@@ -7,34 +7,36 @@ package akka.cluster.metrics.protobuf
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream }
import java.util.zip.{ GZIPInputStream, GZIPOutputStream }
import java.{ lang ⇒ jl }
-
import akka.actor.{ Address, ExtendedActorSystem }
import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages ⇒ cm }
import akka.cluster.metrics.{ ClusterMetricsMessage, ClusterMetricsSettings, EWMA, Metric, MetricsGossip, MetricsGossipEnvelope, NodeMetrics }
import akka.serialization.BaseSerializer
import akka.util.ClassLoaderObjectInputStream
import com.google.protobuf.{ ByteString, MessageLite }
-
import scala.annotation.tailrec
import scala.collection.JavaConverters.{ asJavaIterableConverter, asScalaBufferConverter, setAsJavaSetConverter }
+import akka.serialization.SerializerWithStringManifest
/**
* Protobuf serializer for [[akka.cluster.metrics.ClusterMetricsMessage]] types.
*/
-class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
+class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {
private final val BufferSize = 4 * 1024
- private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: ClusterMetricsMessage], Array[Byte] ⇒ AnyRef](
- classOf[MetricsGossipEnvelope] -> metricsGossipEnvelopeFromBinary)
+ private val MetricsGossipEnvelopeManifest = "a"
- override val includeManifest: Boolean = true
+ override def manifest(obj: AnyRef): String = obj match {
+ case _: MetricsGossipEnvelope ⇒ MetricsGossipEnvelopeManifest
+ case _ ⇒
+ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
+ }
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: MetricsGossipEnvelope ⇒
compress(metricsGossipEnvelopeToProto(m))
case _ ⇒
- throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
+ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
def compress(msg: MessageLite): Array[Byte] = {
@@ -61,12 +63,10 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
out.toByteArray
}
- def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match {
- case Some(c) ⇒ fromBinaryMap.get(c.asInstanceOf[Class[ClusterMetricsMessage]]) match {
- case Some(f) ⇒ f(bytes)
- case None ⇒ throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in metrics")
- }
- case _ ⇒ throw new IllegalArgumentException("Need a metrics message class to be able to deserialize bytes in metrics")
+ override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
+ case MetricsGossipEnvelopeManifest ⇒ metricsGossipEnvelopeFromBinary(bytes)
+ case _ ⇒ throw new IllegalArgumentException(
+ s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}")
}
private def addressToProto(address: Address): cm.Address.Builder = address match {
diff --git a/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/protobuf/MessageSerializerSpec.scala b/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/protobuf/MessageSerializerSpec.scala
index 1f47016bdb..e68b9f8b94 100644
--- a/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/protobuf/MessageSerializerSpec.scala
+++ b/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/protobuf/MessageSerializerSpec.scala
@@ -24,7 +24,7 @@ class MessageSerializerSpec extends AkkaSpec(
def checkSerialization(obj: AnyRef): Unit = {
val blob = serializer.toBinary(obj)
- val ref = serializer.fromBinary(blob, obj.getClass)
+ val ref = serializer.fromBinary(blob, serializer.manifest(obj))
obj match {
case _ ⇒
ref should ===(obj)
diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala
index 770f572742..77a9d8bb24 100644
--- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala
+++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala
@@ -27,40 +27,57 @@ import akka.serialization.Serialization
import akka.actor.ActorRef
import akka.serialization.SerializationExtension
import scala.collection.immutable.TreeMap
+import akka.serialization.SerializerWithStringManifest
/**
* Protobuf serializer of DistributedPubSubMediator messages.
*/
-class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
+class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem)
+ extends SerializerWithStringManifest with BaseSerializer {
+
+ private lazy val serialization = SerializationExtension(system)
private final val BufferSize = 1024 * 4
- private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: DistributedPubSubMessage], Array[Byte] ⇒ AnyRef](
- classOf[Status] -> statusFromBinary,
- classOf[Delta] -> deltaFromBinary,
- classOf[Send] -> sendFromBinary,
- classOf[SendToAll] -> sendToAllFromBinary,
- classOf[Publish] -> publishFromBinary)
+ private val StatusManifest = "A"
+ private val DeltaManifest = "B"
+ private val SendManifest = "C"
+ private val SendToAllManifest = "D"
+ private val PublishManifest = "E"
- def includeManifest: Boolean = true
+ private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef](
+ StatusManifest -> statusFromBinary,
+ DeltaManifest -> deltaFromBinary,
+ SendManifest -> sendFromBinary,
+ SendToAllManifest -> sendToAllFromBinary,
+ PublishManifest -> publishFromBinary)
- def toBinary(obj: AnyRef): Array[Byte] = obj match {
+ override def manifest(obj: AnyRef): String = obj match {
+ case _: Status ⇒ StatusManifest
+ case _: Delta ⇒ DeltaManifest
+ case _: Send ⇒ SendManifest
+ case _: SendToAll ⇒ SendToAllManifest
+ case _: Publish ⇒ PublishManifest
+ case _ ⇒
+ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
+ }
+
+ override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case m: Status ⇒ compress(statusToProto(m))
case m: Delta ⇒ compress(deltaToProto(m))
case m: Send ⇒ sendToProto(m).toByteArray
case m: SendToAll ⇒ sendToAllToProto(m).toByteArray
case m: Publish ⇒ publishToProto(m).toByteArray
case _ ⇒
- throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
+ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
- def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match {
- case Some(c) ⇒ fromBinaryMap.get(c.asInstanceOf[Class[DistributedPubSubMessage]]) match {
+ override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
+ fromBinaryMap.get(manifest) match {
case Some(f) ⇒ f(bytes)
- case None ⇒ throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in DistributedPubSubMessageSerializer")
+ case None ⇒ throw new IllegalArgumentException(
+ s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
}
- case _ ⇒ throw new IllegalArgumentException("Need a message class to be able to deserialize bytes in DistributedPubSubMessageSerializer")
- }
def compress(msg: MessageLite): Array[Byte] = {
val bos = new ByteArrayOutputStream(BufferSize)
@@ -189,21 +206,30 @@ class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extend
private def payloadToProto(msg: Any): dm.Payload = {
val m = msg.asInstanceOf[AnyRef]
- val msgSerializer = SerializationExtension(system).findSerializerFor(m)
+ val msgSerializer = serialization.findSerializerFor(m)
val builder = dm.Payload.newBuilder().
setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(m)))
.setSerializerId(msgSerializer.identifier)
- if (msgSerializer.includeManifest)
- builder.setMessageManifest(ByteString.copyFromUtf8(m.getClass.getName))
+
+ msgSerializer match {
+ case ser2: SerializerWithStringManifest ⇒
+ val manifest = ser2.manifest(m)
+ if (manifest != "")
+ builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
+ case _ ⇒
+ if (msgSerializer.includeManifest)
+ builder.setMessageManifest(ByteString.copyFromUtf8(m.getClass.getName))
+ }
+
builder.build()
}
private def payloadFromProto(payload: dm.Payload): AnyRef = {
- SerializationExtension(system).deserialize(
+ val manifest = if (payload.hasMessageManifest) payload.getMessageManifest.toStringUtf8 else ""
+ serialization.deserialize(
payload.getEnclosedMessage.toByteArray,
payload.getSerializerId,
- if (payload.hasMessageManifest)
- Some(system.dynamicAccess.getClassFor[AnyRef](payload.getMessageManifest.toStringUtf8).get) else None).get
+ manifest).get
}
}
diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializerSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializerSpec.scala
index 43830d7ab4..6925b67e55 100644
--- a/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializerSpec.scala
+++ b/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializerSpec.scala
@@ -17,7 +17,7 @@ class DistributedPubSubMessageSerializerSpec extends AkkaSpec {
def checkSerialization(obj: AnyRef): Unit = {
val blob = serializer.toBinary(obj)
- val ref = serializer.fromBinary(blob, obj.getClass)
+ val ref = serializer.fromBinary(blob, serializer.manifest(obj))
ref should ===(obj)
}
diff --git a/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java b/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java
index 8aa1518064..7696b2bb4e 100644
--- a/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java
+++ b/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java
@@ -3,9 +3,14 @@
*/
package docs.serialization;
+import java.io.UnsupportedEncodingException;
+
import akka.testkit.JavaTestKit;
+
import org.junit.Test;
import static org.junit.Assert.*;
+import java.nio.charset.StandardCharsets;
+
//#imports
import akka.actor.*;
import akka.serialization.*;
@@ -49,6 +54,79 @@ public class SerializationDocTest {
}
//#my-own-serializer
+ static class Customer {
+ public final String name;
+
+ Customer(String name) {
+ this.name = name;
+ }
+ }
+
+ static class User {
+ public final String name;
+
+ User(String name) {
+ this.name = name;
+ }
+ }
+
+ static
+ //#my-own-serializer2
+ public class MyOwnSerializer2 extends SerializerWithStringManifest {
+
+ private static final String CUSTOMER_MANIFEST = "customer";
+ private static final String USER_MANIFEST = "user";
+ private static final String UTF_8 = StandardCharsets.UTF_8.name();
+
+ // Pick a unique identifier for your Serializer,
+ // you've got a couple of billions to choose from,
+ // 0 - 16 is reserved by Akka itself
+ @Override public int identifier() {
+ return 1234567;
+ }
+
+ @Override public String manifest(Object obj) {
+ if (obj instanceof Customer)
+ return CUSTOMER_MANIFEST;
+ else if (obj instanceof User)
+ return USER_MANIFEST;
+ else
+ throw new IllegalArgumentException("Unknow type: " + obj);
+ }
+
+ // "toBinary" serializes the given object to an Array of Bytes
+ @Override public byte[] toBinary(Object obj) {
+ // Put the real code that serializes the object here
+ try {
+ if (obj instanceof Customer)
+ return ((Customer) obj).name.getBytes(UTF_8);
+ else if (obj instanceof User)
+ return ((User) obj).name.getBytes(UTF_8);
+ else
+ throw new IllegalArgumentException("Unknow type: " + obj);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ // "fromBinary" deserializes the given array,
+ // using the type hint
+ @Override public Object fromBinary(byte[] bytes, String manifest) {
+ // Put the real code that deserializes here
+ try {
+ if (manifest.equals(CUSTOMER_MANIFEST))
+ return new Customer(new String(bytes, UTF_8));
+ else if (manifest.equals(USER_MANIFEST))
+ return new User(new String(bytes, UTF_8));
+ else
+ throw new IllegalArgumentException("Unknow manifest: " + manifest);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ }
+//#my-own-serializer2
+
@Test public void serializeActorRefs() {
final ExtendedActorSystem extendedSystem = (ExtendedActorSystem)
ActorSystem.create("whatever");
diff --git a/akka-docs/rst/java/serialization.rst b/akka-docs/rst/java/serialization.rst
index 1706df00b1..7b60c5ee86 100644
--- a/akka-docs/rst/java/serialization.rst
+++ b/akka-docs/rst/java/serialization.rst
@@ -103,9 +103,38 @@ which is done by extending ``akka.serialization.JSerializer``, like this:
:include: my-own-serializer
:exclude: ...
+The manifest is a type hint so that the same serializer can be used for different
+classes. The manifest parameter in ``fromBinaryJava`` is the class of the object that
+was serialized. In ``fromBinary`` you can match on the class and deserialize the
+bytes to different objects.
+
Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then
list which classes that should be serialized using it.
+Serializer with String Manifest
+-------------------------------
+
+The ``Serializer`` illustrated above supports a class based manifest (type hint).
+For serialization of data that need to evolve over time the `SerializerWithStringManifest`
+is recommended instead of ``Serializer`` because the manifest (type hint) is a ``String``
+instead of a ``Class``. That means that the class can be moved/removed and the serializer
+can still deserialize old data by matching on the ``String``. This is especially useful
+for :ref:`persistence-java`.
+
+The manifest string can also encode a version number that can be used in ``fromBinary`` to
+deserialize in different ways to migrate old data to new domain objects.
+
+If the data was originally serialized with ``Serializer`` and in a later version of the
+system you change to ``SerializerWithStringManifest`` the manifest string will be the full
+class name if you used ``includeManifest=true``, otherwise it will be the empty string.
+
+This is how a ``SerializerWithStringManifest`` looks like:
+
+.. includecode:: code/docs/serialization/SerializationDocTest.java#my-own-serializer2
+
+You must also bind it to a name in your :ref:`configuration` and then list which classes
+that should be serialized using it.
+
Serializing ActorRefs
---------------------
diff --git a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala
index d308e45707..638e88d7d0 100644
--- a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala
+++ b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala
@@ -15,12 +15,13 @@ package docs.serialization {
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.Address
+ import java.nio.charset.StandardCharsets
//#my-own-serializer
class MyOwnSerializer extends Serializer {
// This is whether "fromBinary" requires a "clazz" or not
- def includeManifest: Boolean = false
+ def includeManifest: Boolean = true
// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
@@ -37,7 +38,6 @@ package docs.serialization {
// "fromBinary" deserializes the given array,
// using the type hint (if any, see "includeManifest" above)
- // into the optionally provided classLoader.
def fromBinary(bytes: Array[Byte],
clazz: Option[Class[_]]): AnyRef = {
// Put your code that deserializes here
@@ -48,8 +48,52 @@ package docs.serialization {
}
//#my-own-serializer
+ //#my-own-serializer2
+ class MyOwnSerializer2 extends SerializerWithStringManifest {
+
+ val CustomerManifest = "customer"
+ val UserManifest = "user"
+ val UTF_8 = StandardCharsets.UTF_8.name()
+
+ // Pick a unique identifier for your Serializer,
+ // you've got a couple of billions to choose from,
+ // 0 - 16 is reserved by Akka itself
+ def identifier = 1234567
+
+ // The manifest (type hint) that will be provided in the fromBinary method
+ // Use `""` if manifest is not needed.
+ def manifest(obj: AnyRef): String =
+ obj match {
+ case _: Customer => CustomerManifest
+ case _: User => UserManifest
+ }
+
+ // "toBinary" serializes the given object to an Array of Bytes
+ def toBinary(obj: AnyRef): Array[Byte] = {
+ // Put the real code that serializes the object here
+ obj match {
+ case Customer(name) => name.getBytes(UTF_8)
+ case User(name) => name.getBytes(UTF_8)
+ }
+ }
+
+ // "fromBinary" deserializes the given array,
+ // using the type hint
+ def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
+ // Put the real code that deserializes here
+ manifest match {
+ case CustomerManifest =>
+ Customer(new String(bytes, UTF_8))
+ case UserManifest =>
+ User(new String(bytes, UTF_8))
+ }
+ }
+ }
+ //#my-own-serializer2
+
trait MyOwnSerializable
final case class Customer(name: String) extends MyOwnSerializable
+ final case class User(name: String) extends MyOwnSerializable
class SerializationDocSpec extends AkkaSpec {
"demonstrate configuration of serialize messages" in {
diff --git a/akka-docs/rst/scala/serialization.rst b/akka-docs/rst/scala/serialization.rst
index 758b8c92a8..d4d1da4301 100644
--- a/akka-docs/rst/scala/serialization.rst
+++ b/akka-docs/rst/scala/serialization.rst
@@ -95,9 +95,38 @@ First you need to create a class definition of your ``Serializer`` like so:
:include: imports,my-own-serializer
:exclude: ...
+The manifest is a type hint so that the same serializer can be used for different
+classes. The manifest parameter in ``fromBinary`` is the class of the object that
+was serialized. In ``fromBinary`` you can match on the class and deserialize the
+bytes to different objects.
+
Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then
list which classes that should be serialized using it.
+Serializer with String Manifest
+-------------------------------
+
+The ``Serializer`` illustrated above supports a class based manifest (type hint).
+For serialization of data that need to evolve over time the `SerializerWithStringManifest`
+is recommended instead of ``Serializer`` because the manifest (type hint) is a ``String``
+instead of a ``Class``. That means that the class can be moved/removed and the serializer
+can still deserialize old data by matching on the ``String``. This is especially useful
+for :ref:`persistence-scala`.
+
+The manifest string can also encode a version number that can be used in ``fromBinary`` to
+deserialize in different ways to migrate old data to new domain objects.
+
+If the data was originally serialized with ``Serializer`` and in a later version of the
+system you change to ``SerializerWithStringManifest`` the manifest string will be the full
+class name if you used ``includeManifest=true``, otherwise it will be the empty string.
+
+This is how a ``SerializerWithStringManifest`` looks like:
+
+.. includecode:: code/docs/serialization/SerializationDocSpec.scala#my-own-serializer2
+
+You must also bind it to a name in your :ref:`configuration` and then list which classes
+that should be serialized using it.
+
Serializing ActorRefs
---------------------
diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala
index 87e1e54a10..6fbec39b0a 100644
--- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala
+++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala
@@ -7,6 +7,7 @@ package akka.persistence.journal.leveldb
import org.iq80.leveldb.DBIterator
import akka.actor.Actor
+import akka.util.ByteString.UTF_8
/**
* INTERNAL API.
@@ -38,7 +39,7 @@ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore
val nextEntry = iter.next()
val nextKey = keyFromBytes(nextEntry.getKey)
if (!isMappingKey(nextKey)) pathMap else {
- val nextVal = new String(nextEntry.getValue, "UTF-8")
+ val nextVal = new String(nextEntry.getValue, UTF_8)
readIdMap(pathMap + (nextVal -> nextKey.mappingId), iter)
}
}
@@ -46,7 +47,7 @@ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore
private def writeIdMapping(id: String, numericId: Int): Int = {
idMap = idMap + (id -> numericId)
- leveldb.put(keyToBytes(mappingKey(numericId)), id.getBytes("UTF-8"))
+ leveldb.put(keyToBytes(mappingKey(numericId)), id.getBytes(UTF_8))
numericId
}
diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala
index a00484917f..169db11cfc 100644
--- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala
+++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala
@@ -34,6 +34,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap]
val PersistentStateChangeEventClass = classOf[StateChangeEvent]
+ private lazy val serialization = SerializationExtension(system)
+
override val includeManifest: Boolean = true
private lazy val transportInformation: Option[Serialization.Information] = {
@@ -127,10 +129,18 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
private def persistentPayloadBuilder(payload: AnyRef) = {
def payloadBuilder() = {
- val serializer = SerializationExtension(system).findSerializerFor(payload)
+ val serializer = serialization.findSerializerFor(payload)
val builder = PersistentPayload.newBuilder()
- if (serializer.includeManifest) builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName))
+ serializer match {
+ case ser2: SerializerWithStringManifest ⇒
+ val manifest = ser2.manifest(payload)
+ if (manifest != "")
+ builder.setPayloadManifest(ByteString.copyFromUtf8(manifest))
+ case _ ⇒
+ if (serializer.includeManifest)
+ builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName))
+ }
builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload)))
builder.setSerializerId(serializer.identifier)
@@ -158,13 +168,13 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
}
private def payload(persistentPayload: PersistentPayload): Any = {
- val payloadClass = if (persistentPayload.hasPayloadManifest)
- Some(system.dynamicAccess.getClassFor[AnyRef](persistentPayload.getPayloadManifest.toStringUtf8).get) else None
+ val manifest = if (persistentPayload.hasPayloadManifest)
+ persistentPayload.getPayloadManifest.toStringUtf8 else ""
- SerializationExtension(system).deserialize(
+ serialization.deserialize(
persistentPayload.getPayload.toByteArray,
persistentPayload.getSerializerId,
- payloadClass).get
+ manifest).get
}
}
diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala
index 0665f06688..ffda65e83f 100644
--- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala
+++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala
@@ -8,6 +8,7 @@ package akka.persistence.serialization
import java.io._
import akka.actor._
import akka.serialization._
+import akka.util.ByteString.UTF_8
import scala.util.Success
import scala.util.Failure
@@ -33,6 +34,8 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
override val includeManifest: Boolean = false
+ private lazy val serialization = SerializationExtension(system)
+
private lazy val transportInformation: Option[Serialization.Information] = {
val address = system.provider.getDefaultAddress
if (address.hasLocalScope) None
@@ -57,14 +60,21 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = {
def serialize() = {
- val extension = SerializationExtension(system)
-
- val snapshotSerializer = extension.findSerializerFor(snapshot)
+ val snapshotSerializer = serialization.findSerializerFor(snapshot)
val headerOut = new ByteArrayOutputStream
writeInt(headerOut, snapshotSerializer.identifier)
- if (snapshotSerializer.includeManifest)
- headerOut.write(snapshot.getClass.getName.getBytes("utf-8"))
+
+ snapshotSerializer match {
+ case ser2: SerializerWithStringManifest ⇒
+ val manifest = ser2.manifest(snapshot)
+ if (manifest != "")
+ headerOut.write(manifest.getBytes(UTF_8))
+ case _ ⇒
+ if (snapshotSerializer.includeManifest)
+ headerOut.write(snapshot.getClass.getName.getBytes(UTF_8))
+ }
+
val headerBytes = headerOut.toByteArray
val out = new ByteArrayOutputStream
@@ -84,8 +94,6 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
}
private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = {
- val extension = SerializationExtension(system)
-
val in = new ByteArrayInputStream(bytes)
val headerLength = readInt(in)
val headerBytes = bytes.slice(4, headerLength + 4)
@@ -122,7 +130,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val oldHeader =
if (readShort(in) == 0xedac) { // Java Serialization magic value with swapped bytes
val b = if (SnapshotSerializer.doPatch) patch(headerBytes) else headerBytes
- extension.deserialize(b, classOf[SnapshotHeader]).toOption
+ serialization.deserialize(b, classOf[SnapshotHeader]).toOption
} else None
val header = oldHeader.getOrElse {
@@ -134,13 +142,12 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer
else {
val manifestBytes = Array.ofDim[Byte](remaining)
headerIn.read(manifestBytes)
- Some(new String(manifestBytes, "utf-8"))
+ Some(new String(manifestBytes, UTF_8))
}
SnapshotHeader(serializerId, manifest)
}
- val manifest = header.manifest.map(system.dynamicAccess.getClassFor[AnyRef](_).get)
- extension.deserialize[AnyRef](snapshotBytes, header.serializerId, manifest).get
+ serialization.deserialize(snapshotBytes, header.serializerId, header.manifest.getOrElse("")).get
}
private def writeInt(outputStream: OutputStream, i: Int) =
@@ -205,4 +212,4 @@ object SnapshotSerializer {
} else false
} else false
}
-}
\ No newline at end of file
+}
diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala
index 2f88fc4dcd..7b5c8e2792 100644
--- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala
+++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala
@@ -17,6 +17,7 @@ import akka.persistence._
import akka.persistence.snapshot._
import akka.persistence.serialization._
import akka.serialization.SerializationExtension
+import akka.util.ByteString.UTF_8
/**
* INTERNAL API.
@@ -102,13 +103,13 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
try { p(stream) } finally { stream.close() }
private def snapshotFile(metadata: SnapshotMetadata, extension: String = ""): File =
- new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}${extension}")
+ new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, UTF_8)}-${metadata.sequenceNr}-${metadata.timestamp}${extension}")
private def snapshotMetadata(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = {
val files = snapshotDir.listFiles(new SnapshotFilenameFilter(persistenceId))
if (files eq null) Nil // if the dir was removed
else files.map(_.getName).collect {
- case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong)
+ case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, UTF_8), snr.toLong, tms.toLong)
}.filter(md ⇒ criteria.matches(md) && !saving.contains(md)).toVector
}
diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala
index aecadcc105..36c42869fa 100644
--- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala
+++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala
@@ -10,24 +10,34 @@ import akka.actor._
import akka.persistence._
import akka.serialization._
import akka.testkit._
-
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
+import akka.util.ByteString.UTF_8
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import org.apache.commons.codec.binary.Hex.decodeHex
+import SerializerSpecConfigs._
+
object SerializerSpecConfigs {
val customSerializers = ConfigFactory.parseString(
"""
akka.actor {
serializers {
my-payload = "akka.persistence.serialization.MyPayloadSerializer"
+ my-payload2 = "akka.persistence.serialization.MyPayload2Serializer"
my-snapshot = "akka.persistence.serialization.MySnapshotSerializer"
+ my-snapshot2 = "akka.persistence.serialization.MySnapshotSerializer2"
+ old-payload = "akka.persistence.serialization.OldPayloadSerializer"
}
serialization-bindings {
"akka.persistence.serialization.MyPayload" = my-payload
+ "akka.persistence.serialization.MyPayload2" = my-payload2
"akka.persistence.serialization.MySnapshot" = my-snapshot
+ "akka.persistence.serialization.MySnapshot2" = my-snapshot2
+ # this entry was used when creating the data for the test
+ # "deserialize data when class is removed"
+ #"akka.persistence.serialization.OldPayload" = old-payload
}
}
""")
@@ -53,6 +63,7 @@ object SerializerSpecConfigs {
def config(configs: String*): Config =
configs.foldLeft(ConfigFactory.empty)((r, c) ⇒ r.withFallback(ConfigFactory.parseString(c)))
+
}
import SerializerSpecConfigs._
@@ -71,9 +82,19 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
deserialized should ===(Snapshot(MySnapshot(".a.")))
}
+ "handle custom snapshot Serialization with string manifest" in {
+ val wrapped = Snapshot(MySnapshot2("a"))
+ val serializer = serialization.findSerializerFor(wrapped)
+
+ val bytes = serializer.toBinary(wrapped)
+ val deserialized = serializer.fromBinary(bytes, None)
+
+ deserialized should ===(Snapshot(MySnapshot2(".a.")))
+ }
+
"be able to read snapshot created with akka 2.3.6 and Scala 2.10" in {
val dataStr = "abc"
- val snapshot = Snapshot(dataStr.getBytes("utf-8"))
+ val snapshot = Snapshot(dataStr.getBytes(UTF_8))
val serializer = serialization.findSerializerFor(snapshot)
// the oldSnapshot was created with Akka 2.3.6 and it is using JavaSerialization
@@ -91,13 +112,13 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
val bytes = decodeHex(oldSnapshot.toCharArray)
val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot]
- val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], "utf-8")
+ val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], UTF_8)
dataStr should ===(deserializedDataStr)
}
"be able to read snapshot created with akka 2.3.6 and Scala 2.11" in {
val dataStr = "abc"
- val snapshot = Snapshot(dataStr.getBytes("utf-8"))
+ val snapshot = Snapshot(dataStr.getBytes(UTF_8))
val serializer = serialization.findSerializerFor(snapshot)
// the oldSnapshot was created with Akka 2.3.6 and it is using JavaSerialization
@@ -115,7 +136,7 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
val bytes = decodeHex(oldSnapshot.toCharArray)
val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot]
- val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], "utf-8")
+ val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], UTF_8)
dataStr should ===(deserializedDataStr)
}
}
@@ -136,6 +157,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
deserialized should ===(persistent.withPayload(MyPayload(".a.")))
}
}
+
"given a PersistentRepr manifest" must {
"handle custom Persistent message serialization" in {
val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, testActor)
@@ -148,6 +170,57 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
}
}
+ "given payload serializer with string manifest" must {
+ "handle serialization" in {
+ val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", true, testActor)
+ val serializer = serialization.findSerializerFor(persistent)
+
+ val bytes = serializer.toBinary(persistent)
+ val deserialized = serializer.fromBinary(bytes, None)
+
+ deserialized should ===(persistent.withPayload(MyPayload2(".a.", 17)))
+ }
+
+ "be able to evolve the data types" in {
+ val oldEvent = MyPayload("a")
+ val serializer1 = serialization.findSerializerFor(oldEvent)
+ val bytes = serializer1.toBinary(oldEvent)
+
+ // now the system is updated to version 2 with new class MyPayload2
+ // and MyPayload2Serializer that handles migration from old MyPayload
+ val serializer2 = serialization.serializerFor(classOf[MyPayload2])
+ val deserialized = serializer2.fromBinary(bytes, Some(oldEvent.getClass))
+
+ deserialized should be(MyPayload2(".a.", 0))
+ }
+
+ "be able to deserialize data when class is removed" in {
+ val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", true, testActor))
+
+ // It was created with:
+ // val old = PersistentRepr(OldPayload('A'), 13, "p1", true, testActor)
+ // import org.apache.commons.codec.binary.Hex._
+ // println(s"encoded OldPayload: " + String.valueOf(encodeHex(serializer.toBinary(old))))
+ //
+ val oldData =
+ "0a3e08c7da04120d4f6c645061796c6f61642841291a2" +
+ "9616b6b612e70657273697374656e63652e7365726961" +
+ "6c697a6174696f6e2e4f6c645061796c6f6164100d1a0" +
+ "2703120015a45616b6b613a2f2f4d6573736167655365" +
+ "7269616c697a657250657273697374656e63655370656" +
+ "32f73797374656d2f746573744163746f722d31233133" +
+ "3137373931343033"
+
+ // now the system is updated, OldPayload is replaced by MyPayload, and the
+ // OldPayloadSerializer is adjusted to migrate OldPayload
+ val bytes = decodeHex(oldData.toCharArray)
+
+ val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[PersistentRepr]
+
+ deserialized.payload should be(MyPayload("OldPayload(A)"))
+ }
+ }
+
"given AtLeastOnceDeliverySnapshot" must {
"handle empty unconfirmed" in {
val unconfirmed = Vector.empty
@@ -173,7 +246,6 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
deserialized should ===(snap)
}
-
}
}
@@ -222,7 +294,13 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS
}
final case class MyPayload(data: String)
+final case class MyPayload2(data: String, n: Int)
final case class MySnapshot(data: String)
+final case class MySnapshot2(data: String)
+
+// this class was used when creating the data for the test
+// "deserialize data when class is removed"
+//final case class OldPayload(c: Char)
class MyPayloadSerializer extends Serializer {
val MyPayloadClass = classOf[MyPayload]
@@ -231,16 +309,41 @@ class MyPayloadSerializer extends Serializer {
def includeManifest: Boolean = true
def toBinary(o: AnyRef): Array[Byte] = o match {
- case MyPayload(data) ⇒ s".${data}".getBytes("UTF-8")
+ case MyPayload(data) ⇒ s".${data}".getBytes(UTF_8)
}
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
- case Some(MyPayloadClass) ⇒ MyPayload(s"${new String(bytes, "UTF-8")}.")
+ case Some(MyPayloadClass) ⇒ MyPayload(s"${new String(bytes, UTF_8)}.")
case Some(c) ⇒ throw new Exception(s"unexpected manifest ${c}")
case None ⇒ throw new Exception("no manifest")
}
}
+class MyPayload2Serializer extends SerializerWithStringManifest {
+ val MyPayload2Class = classOf[MyPayload]
+
+ val ManifestV1 = classOf[MyPayload].getName
+ val ManifestV2 = "MyPayload-V2"
+
+ def identifier: Int = 77125
+
+ def manifest(o: AnyRef): String = ManifestV2
+
+ def toBinary(o: AnyRef): Array[Byte] = o match {
+ case MyPayload2(data, n) ⇒ s".$data:$n".getBytes(UTF_8)
+ }
+
+ def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
+ case ManifestV2 ⇒
+ val parts = new String(bytes, UTF_8).split(":")
+ MyPayload2(data = parts(0) + ".", n = parts(1).toInt)
+ case ManifestV1 ⇒
+ MyPayload2(data = s"${new String(bytes, UTF_8)}.", n = 0)
+ case other ⇒
+ throw new Exception(s"unexpected manifest [$other]")
+ }
+}
+
class MySnapshotSerializer extends Serializer {
val MySnapshotClass = classOf[MySnapshot]
@@ -248,12 +351,55 @@ class MySnapshotSerializer extends Serializer {
def includeManifest: Boolean = true
def toBinary(o: AnyRef): Array[Byte] = o match {
- case MySnapshot(data) ⇒ s".${data}".getBytes("UTF-8")
+ case MySnapshot(data) ⇒ s".${data}".getBytes(UTF_8)
}
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
- case Some(MySnapshotClass) ⇒ MySnapshot(s"${new String(bytes, "UTF-8")}.")
+ case Some(MySnapshotClass) ⇒ MySnapshot(s"${new String(bytes, UTF_8)}.")
case Some(c) ⇒ throw new Exception(s"unexpected manifest ${c}")
case None ⇒ throw new Exception("no manifest")
}
}
+
+class MySnapshotSerializer2 extends SerializerWithStringManifest {
+ val CurrentManifest = "MySnapshot-V2"
+ val OldManifest = classOf[MySnapshot].getName
+
+ def identifier: Int = 77126
+
+ def manifest(o: AnyRef): String = CurrentManifest
+
+ def toBinary(o: AnyRef): Array[Byte] = o match {
+ case MySnapshot2(data) ⇒ s".${data}".getBytes(UTF_8)
+ }
+
+ def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
+ case CurrentManifest | OldManifest ⇒
+ MySnapshot2(s"${new String(bytes, UTF_8)}.")
+ case other ⇒
+ throw new Exception(s"unexpected manifest [$other]")
+ }
+}
+
+class OldPayloadSerializer extends SerializerWithStringManifest {
+
+ def identifier: Int = 77127
+ val OldPayloadClassName = "akka.persistence.serialization.OldPayload"
+ val MyPayloadClassName = classOf[MyPayload].getName
+
+ def manifest(o: AnyRef): String = o.getClass.getName
+
+ def toBinary(o: AnyRef): Array[Byte] = o match {
+ case MyPayload(data) ⇒ s".${data}".getBytes(UTF_8)
+ case old if old.getClass.getName == OldPayloadClassName ⇒
+ o.toString.getBytes(UTF_8)
+ }
+
+ def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
+ case OldPayloadClassName ⇒
+ MyPayload(new String(bytes, UTF_8))
+ case MyPayloadClassName ⇒ MyPayload(s"${new String(bytes, UTF_8)}.")
+ case other ⇒
+ throw new Exception(s"unexpected manifest [$other]")
+ }
+}
diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala
index 71fee373f6..2f70eed317 100644
--- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala
+++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala
@@ -8,6 +8,7 @@ import akka.remote.WireFormats._
import com.google.protobuf.ByteString
import akka.actor.ExtendedActorSystem
import akka.serialization.SerializationExtension
+import akka.serialization.SerializerWithStringManifest
/**
* INTERNAL API
@@ -23,7 +24,7 @@ private[akka] object MessageSerializer {
SerializationExtension(system).deserialize(
messageProtocol.getMessage.toByteArray,
messageProtocol.getSerializerId,
- if (messageProtocol.hasMessageManifest) Some(system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8).get) else None).get
+ if (messageProtocol.hasMessageManifest) messageProtocol.getMessageManifest.toStringUtf8 else "").get
}
/**
@@ -35,8 +36,15 @@ private[akka] object MessageSerializer {
val builder = SerializedMessage.newBuilder
builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
builder.setSerializerId(serializer.identifier)
- if (serializer.includeManifest)
- builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
+ serializer match {
+ case ser2: SerializerWithStringManifest ⇒
+ val manifest = ser2.manifest(message)
+ if (manifest != "")
+ builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
+ case _ ⇒
+ if (serializer.includeManifest)
+ builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
+ }
builder.build
}
}
diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala
index 1f5465d136..0066d05ab9 100644
--- a/akka-remote/src/main/scala/akka/remote/Remoting.scala
+++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala
@@ -22,12 +22,13 @@ import scala.util.{ Failure, Success }
import akka.remote.transport.AkkaPduCodec.Message
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
+import akka.util.ByteString.UTF_8
/**
* INTERNAL API
*/
private[remote] object AddressUrlEncoder {
- def apply(address: Address): String = URLEncoder.encode(address.toString, "utf-8")
+ def apply(address: Address): String = URLEncoder.encode(address.toString, UTF_8)
}
/**
diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala
index ced74c719e..fca385685a 100644
--- a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala
+++ b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala
@@ -14,12 +14,15 @@ import akka.actor.SelectionPathElement
import akka.remote.ContainerFormats
import akka.serialization.SerializationExtension
import akka.serialization.BaseSerializer
+import akka.serialization.SerializerWithStringManifest
class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
@deprecated("Use constructor with ExtendedActorSystem", "2.4")
def this() = this(null)
+ private lazy val serialization = SerializationExtension(system)
+
// TODO remove this when deprecated this() is removed
override val identifier: Int =
if (system eq null) 6
@@ -37,14 +40,21 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSe
private def serializeSelection(sel: ActorSelectionMessage): Array[Byte] = {
val builder = ContainerFormats.SelectionEnvelope.newBuilder()
val message = sel.msg.asInstanceOf[AnyRef]
- val serializer = SerializationExtension(system).findSerializerFor(message)
+ val serializer = serialization.findSerializerFor(message)
builder.
setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(message))).
setSerializerId(serializer.identifier).
setWildcardFanOut(sel.wildcardFanOut)
- if (serializer.includeManifest)
- builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
+ serializer match {
+ case ser2: SerializerWithStringManifest ⇒
+ val manifest = ser2.manifest(message)
+ if (manifest != "")
+ builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
+ case _ ⇒
+ if (serializer.includeManifest)
+ builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
+ }
sel.elements.foreach {
case SelectChildName(name) ⇒
@@ -66,11 +76,11 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSe
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
val selectionEnvelope = ContainerFormats.SelectionEnvelope.parseFrom(bytes)
- val msg = SerializationExtension(system).deserialize(
+ val manifest = if (selectionEnvelope.hasMessageManifest) selectionEnvelope.getMessageManifest.toStringUtf8 else ""
+ val msg = serialization.deserialize(
selectionEnvelope.getEnclosedMessage.toByteArray,
selectionEnvelope.getSerializerId,
- if (selectionEnvelope.hasMessageManifest)
- Some(system.dynamicAccess.getClassFor[AnyRef](selectionEnvelope.getMessageManifest.toStringUtf8).get) else None).get
+ manifest).get
import scala.collection.JavaConverters._
val elements: immutable.Iterable[SelectionPathElement] = selectionEnvelope.getPatternList.asScala.map { x ⇒