From 740f006a3864afcedecb6b76a1037b0dd82e4643 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 May 2015 18:42:22 +0200 Subject: [PATCH] +act #17576 Support serializer with string manifest * useful when evolution is needed, e.g. Akka Persistence * docs, comments, cluster-metrics and cluster-tools serializers --- .../scala/akka/actor/dungeon/Children.scala | 14 +- .../scala/akka/actor/dungeon/Dispatch.scala | 11 +- .../akka/serialization/Serialization.scala | 40 ++++- .../scala/akka/serialization/Serializer.scala | 70 +++++++- .../src/main/scala/akka/util/ByteString.scala | 13 +- .../metrics/protobuf/MessageSerializer.scala | 26 +-- .../protobuf/MessageSerializerSpec.scala | 2 +- .../DistributedPubSubMessageSerializer.scala | 68 ++++--- ...stributedPubSubMessageSerializerSpec.scala | 2 +- .../serialization/SerializationDocTest.java | 78 ++++++++ akka-docs/rst/java/serialization.rst | 29 +++ .../serialization/SerializationDocSpec.scala | 48 ++++- akka-docs/rst/scala/serialization.rst | 29 +++ .../journal/leveldb/LeveldbIdMapping.scala | 5 +- .../serialization/MessageSerializer.scala | 22 ++- .../serialization/SnapshotSerializer.scala | 31 ++-- .../snapshot/local/LocalSnapshotStore.scala | 5 +- .../serialization/SerializerSpec.scala | 166 ++++++++++++++++-- .../scala/akka/remote/MessageSerializer.scala | 14 +- .../src/main/scala/akka/remote/Remoting.scala | 3 +- .../MessageContainerSerializer.scala | 22 ++- 21 files changed, 605 insertions(+), 93 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index 16b7953e21..f14313de84 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -10,6 +10,7 @@ import scala.collection.immutable import akka.actor._ import akka.serialization.SerializationExtension import akka.util.{ Unsafe, Helpers } +import akka.serialization.SerializerWithStringManifest private[akka] trait Children { this: ActorCell ⇒ @@ -190,7 +191,18 @@ private[akka] trait Children { this: ActorCell ⇒ props.args forall (arg ⇒ arg == null || arg.isInstanceOf[NoSerializationVerificationNeeded] || - ser.deserialize(ser.serialize(arg.asInstanceOf[AnyRef]).get, arg.getClass).get != null) + { + val o = arg.asInstanceOf[AnyRef] + val serializer = ser.findSerializerFor(o) + val bytes = serializer.toBinary(o) + serializer match { + case ser2: SerializerWithStringManifest ⇒ + val manifest = ser2.manifest(o) + ser.deserialize(bytes, serializer.identifier, manifest).get != null + case _ ⇒ + ser.deserialize(bytes, arg.getClass).get != null + } + }) } catch { case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e) } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index b45ef9d59b..f404b43306 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -15,6 +15,7 @@ import scala.util.control.NonFatal import scala.util.control.Exception.Catcher import akka.dispatch.MailboxType import akka.dispatch.ProducesMessageQueue +import akka.serialization.SerializerWithStringManifest private[akka] trait Dispatch { this: ActorCell ⇒ @@ -117,7 +118,15 @@ private[akka] trait Dispatch { this: ActorCell ⇒ }).asInstanceOf[AnyRef] if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) { val s = SerializationExtension(system) - s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get + val serializer = s.findSerializerFor(unwrapped) + val bytes = serializer.toBinary(unwrapped) + serializer match { + case ser2: SerializerWithStringManifest ⇒ + val manifest = ser2.manifest(unwrapped) + s.deserialize(bytes, serializer.identifier, manifest).get != null + case _ ⇒ + s.deserialize(bytes, unwrapped.getClass).get + } } } dispatcher.dispatch(this, msg) diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 17e6625e80..09f29b2b50 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -13,6 +13,7 @@ import java.io.NotSerializableException import scala.util.{ Try, DynamicVariable, Failure } import scala.collection.immutable import scala.util.control.NonFatal +import scala.util.Success object Serialization { @@ -91,7 +92,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { /** * Deserializes the given array of bytes using the specified serializer id, - * using the optional type hint to the Serializer and the optional ClassLoader ot load it into. + * using the optional type hint to the Serializer. * Returns either the resulting object or an Exception if one was thrown. */ def deserialize[T](bytes: Array[Byte], serializerId: Int, clazz: Option[Class[_ <: T]]): Try[T] = @@ -104,9 +105,37 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { serializer.fromBinary(bytes, clazz).asInstanceOf[T] } + /** + * Deserializes the given array of bytes using the specified serializer id, + * using the optional type hint to the Serializer. + * Returns either the resulting object or an Exception if one was thrown. + */ + def deserialize(bytes: Array[Byte], serializerId: Int, manifest: String): Try[AnyRef] = + Try { + val serializer = try serializerByIdentity(serializerId) catch { + case _: NoSuchElementException ⇒ throw new NotSerializableException( + s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " + + "akka.actor.serializers is not in synch between the two systems.") + } + serializer match { + case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest) + case s1 ⇒ + if (manifest == "") + s1.fromBinary(bytes, None) + else { + system.dynamicAccess.getClassFor[AnyRef](manifest) match { + case Success(classManifest) ⇒ + s1.fromBinary(bytes, Some(classManifest)) + case Failure(e) ⇒ + throw new NotSerializableException( + s"Cannot find manifest class [$manifest] for serializer with id [$serializerId].") + } + } + } + } + /** * Deserializes the given array of bytes using the specified type to look up what Serializer should be used. - * You can specify an optional ClassLoader to load the object into. * Returns either the resulting object or an Exception if one was thrown. */ def deserialize[T](bytes: Array[Byte], clazz: Class[T]): Try[T] = @@ -118,10 +147,8 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { * Throws akka.ConfigurationException if no `serialization-bindings` is configured for the * class of the object. */ - def findSerializerFor(o: AnyRef): Serializer = o match { - case null ⇒ NullSerializer - case other ⇒ serializerFor(other.getClass) - } + def findSerializerFor(o: AnyRef): Serializer = + if (o eq null) NullSerializer else serializerFor(o.getClass) /** * Returns the configured Serializer for the given Class. The configured Serializer @@ -205,5 +232,6 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { */ val serializerByIdentity: Map[Int, Serializer] = Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) ⇒ (v.identifier, v) } + } diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 429a465764..7cadd47acc 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -26,7 +26,7 @@ import akka.serialization.JavaSerializer.CurrentSystem * load classes using reflection. * * - * Be sure to always use the PropertyManager for loading classes! This is necessary to + * Be sure to always use the [[akka.actor.DynamicAccess]] for loading classes! This is necessary to * avoid strange match errors and inequalities which arise from different class loaders loading * the same class. */ @@ -65,6 +65,74 @@ trait Serializer { final def fromBinary(bytes: Array[Byte], clazz: Class[_]): AnyRef = fromBinary(bytes, Option(clazz)) } +/** + * A Serializer represents a bimap between an object and an array of bytes representing that object. + * + * For serialization of data that need to evolve over time the `SerializerWithStringManifest` is recommended instead + * of [[Serializer]] because the manifest (type hint) is a `String` instead of a `Class`. That means + * that the class can be moved/removed and the serializer can still deserialize old data by matching + * on the `String`. This is especially useful for Akka Persistence. + * + * The manifest string can also encode a version number that can be used in [[#fromBinary]] to + * deserialize in different ways to migrate old data to new domain objects. + * + * If the data was originally serialized with [[Serializer]] and in a later version of the + * system you change to `SerializerWithStringManifest` the manifest string will be the full class name if + * you used `includeManifest=true`, otherwise it will be the empty string. + * + * Serializers are loaded using reflection during [[akka.actor.ActorSystem]] + * start-up, where two constructors are tried in order: + * + * + * + * Be sure to always use the [[akka.actor.DynamicAccess]] for loading classes! This is necessary to + * avoid strange match errors and inequalities which arise from different class loaders loading + * the same class. + */ +abstract class SerializerWithStringManifest extends Serializer { + + /** + * Completely unique value to identify this implementation of Serializer, used to optimize network traffic. + * Values from 0 to 16 are reserved for Akka internal usage. + */ + def identifier: Int + + final override def includeManifest: Boolean = true + + /** + * Return the manifest (type hint) that will be provided in the fromBinary method. + * Use `""` if manifest is not needed. + */ + def manifest(o: AnyRef): String + + /** + * Serializes the given object into an Array of Byte + */ + def toBinary(o: AnyRef): Array[Byte] + + /** + * Produces an object from an array of bytes, with an optional type-hint; + * the class should be loaded using ActorSystem.dynamicAccess. + */ + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef + + final def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = { + val manifestString = manifest match { + case Some(c) ⇒ c.getName + case None ⇒ "" + } + fromBinary(bytes, manifestString) + } + +} + /** * Base serializer trait with serialization identifiers configuration contract, * when globally unique serialization identifier is configured in the `reference.conf`. diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 2ae54536be..4300f46123 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -7,7 +7,6 @@ package akka.util import java.io.{ ObjectInputStream, ObjectOutputStream } import java.nio.{ ByteBuffer, ByteOrder } import java.lang.{ Iterable ⇒ JIterable } - import scala.annotation.varargs import scala.collection.IndexedSeqOptimized import scala.collection.mutable.{ Builder, WrappedArray } @@ -15,6 +14,7 @@ import scala.collection.immutable import scala.collection.immutable.{ IndexedSeq, VectorBuilder } import scala.collection.generic.CanBuildFrom import scala.reflect.ClassTag +import java.nio.charset.StandardCharsets object ByteString { @@ -42,7 +42,7 @@ object ByteString { /** * Creates a new ByteString by encoding a String as UTF-8. */ - def apply(string: String): ByteString = apply(string, "UTF-8") + def apply(string: String): ByteString = apply(string, UTF_8) /** * Creates a new ByteString by encoding a String with a charset. @@ -79,6 +79,11 @@ object ByteString { */ def fromString(string: String, charset: String): ByteString = apply(string, charset) + /** + * Standard "UTF-8" charset + */ + val UTF_8: String = StandardCharsets.UTF_8.name() + /** * Creates a new ByteString by copying bytes out of a ByteBuffer. */ @@ -484,7 +489,7 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz /** * Decodes this ByteString as a UTF-8 encoded String. */ - final def utf8String: String = decodeString("UTF-8") + final def utf8String: String = decodeString(ByteString.UTF_8) /** * Decodes this ByteString using a charset to produce a String. @@ -539,7 +544,7 @@ object CompactByteString { /** * Creates a new CompactByteString by encoding a String as UTF-8. */ - def apply(string: String): CompactByteString = apply(string, "UTF-8") + def apply(string: String): CompactByteString = apply(string, ByteString.UTF_8) /** * Creates a new CompactByteString by encoding a String with a charset. diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala index 9f74c76dba..6171a5f7dc 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala @@ -7,34 +7,36 @@ package akka.cluster.metrics.protobuf import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream } import java.util.zip.{ GZIPInputStream, GZIPOutputStream } import java.{ lang ⇒ jl } - import akka.actor.{ Address, ExtendedActorSystem } import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages ⇒ cm } import akka.cluster.metrics.{ ClusterMetricsMessage, ClusterMetricsSettings, EWMA, Metric, MetricsGossip, MetricsGossipEnvelope, NodeMetrics } import akka.serialization.BaseSerializer import akka.util.ClassLoaderObjectInputStream import com.google.protobuf.{ ByteString, MessageLite } - import scala.annotation.tailrec import scala.collection.JavaConverters.{ asJavaIterableConverter, asScalaBufferConverter, setAsJavaSetConverter } +import akka.serialization.SerializerWithStringManifest /** * Protobuf serializer for [[akka.cluster.metrics.ClusterMetricsMessage]] types. */ -class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer { +class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { private final val BufferSize = 4 * 1024 - private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: ClusterMetricsMessage], Array[Byte] ⇒ AnyRef]( - classOf[MetricsGossipEnvelope] -> metricsGossipEnvelopeFromBinary) + private val MetricsGossipEnvelopeManifest = "a" - override val includeManifest: Boolean = true + override def manifest(obj: AnyRef): String = obj match { + case _: MetricsGossipEnvelope ⇒ MetricsGossipEnvelopeManifest + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") + } override def toBinary(obj: AnyRef): Array[Byte] = obj match { case m: MetricsGossipEnvelope ⇒ compress(metricsGossipEnvelopeToProto(m)) case _ ⇒ - throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") + throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } def compress(msg: MessageLite): Array[Byte] = { @@ -61,12 +63,10 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer out.toByteArray } - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match { - case Some(c) ⇒ fromBinaryMap.get(c.asInstanceOf[Class[ClusterMetricsMessage]]) match { - case Some(f) ⇒ f(bytes) - case None ⇒ throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in metrics") - } - case _ ⇒ throw new IllegalArgumentException("Need a metrics message class to be able to deserialize bytes in metrics") + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case MetricsGossipEnvelopeManifest ⇒ metricsGossipEnvelopeFromBinary(bytes) + case _ ⇒ throw new IllegalArgumentException( + s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}") } private def addressToProto(address: Address): cm.Address.Builder = address match { diff --git a/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/protobuf/MessageSerializerSpec.scala b/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/protobuf/MessageSerializerSpec.scala index 1f47016bdb..e68b9f8b94 100644 --- a/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/protobuf/MessageSerializerSpec.scala +++ b/akka-cluster-metrics/src/test/scala/akka/cluster/metrics/protobuf/MessageSerializerSpec.scala @@ -24,7 +24,7 @@ class MessageSerializerSpec extends AkkaSpec( def checkSerialization(obj: AnyRef): Unit = { val blob = serializer.toBinary(obj) - val ref = serializer.fromBinary(blob, obj.getClass) + val ref = serializer.fromBinary(blob, serializer.manifest(obj)) obj match { case _ ⇒ ref should ===(obj) diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala index 770f572742..77a9d8bb24 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala @@ -27,40 +27,57 @@ import akka.serialization.Serialization import akka.actor.ActorRef import akka.serialization.SerializationExtension import scala.collection.immutable.TreeMap +import akka.serialization.SerializerWithStringManifest /** * Protobuf serializer of DistributedPubSubMediator messages. */ -class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer { +class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) + extends SerializerWithStringManifest with BaseSerializer { + + private lazy val serialization = SerializationExtension(system) private final val BufferSize = 1024 * 4 - private val fromBinaryMap = collection.immutable.HashMap[Class[_ <: DistributedPubSubMessage], Array[Byte] ⇒ AnyRef]( - classOf[Status] -> statusFromBinary, - classOf[Delta] -> deltaFromBinary, - classOf[Send] -> sendFromBinary, - classOf[SendToAll] -> sendToAllFromBinary, - classOf[Publish] -> publishFromBinary) + private val StatusManifest = "A" + private val DeltaManifest = "B" + private val SendManifest = "C" + private val SendToAllManifest = "D" + private val PublishManifest = "E" - def includeManifest: Boolean = true + private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef]( + StatusManifest -> statusFromBinary, + DeltaManifest -> deltaFromBinary, + SendManifest -> sendFromBinary, + SendToAllManifest -> sendToAllFromBinary, + PublishManifest -> publishFromBinary) - def toBinary(obj: AnyRef): Array[Byte] = obj match { + override def manifest(obj: AnyRef): String = obj match { + case _: Status ⇒ StatusManifest + case _: Delta ⇒ DeltaManifest + case _: Send ⇒ SendManifest + case _: SendToAll ⇒ SendToAllManifest + case _: Publish ⇒ PublishManifest + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") + } + + override def toBinary(obj: AnyRef): Array[Byte] = obj match { case m: Status ⇒ compress(statusToProto(m)) case m: Delta ⇒ compress(deltaToProto(m)) case m: Send ⇒ sendToProto(m).toByteArray case m: SendToAll ⇒ sendToAllToProto(m).toByteArray case m: Publish ⇒ publishToProto(m).toByteArray case _ ⇒ - throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") + throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = clazz match { - case Some(c) ⇒ fromBinaryMap.get(c.asInstanceOf[Class[DistributedPubSubMessage]]) match { + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = + fromBinaryMap.get(manifest) match { case Some(f) ⇒ f(bytes) - case None ⇒ throw new IllegalArgumentException(s"Unimplemented deserialization of message class $c in DistributedPubSubMessageSerializer") + case None ⇒ throw new IllegalArgumentException( + s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } - case _ ⇒ throw new IllegalArgumentException("Need a message class to be able to deserialize bytes in DistributedPubSubMessageSerializer") - } def compress(msg: MessageLite): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) @@ -189,21 +206,30 @@ class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem) extend private def payloadToProto(msg: Any): dm.Payload = { val m = msg.asInstanceOf[AnyRef] - val msgSerializer = SerializationExtension(system).findSerializerFor(m) + val msgSerializer = serialization.findSerializerFor(m) val builder = dm.Payload.newBuilder(). setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(m))) .setSerializerId(msgSerializer.identifier) - if (msgSerializer.includeManifest) - builder.setMessageManifest(ByteString.copyFromUtf8(m.getClass.getName)) + + msgSerializer match { + case ser2: SerializerWithStringManifest ⇒ + val manifest = ser2.manifest(m) + if (manifest != "") + builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) + case _ ⇒ + if (msgSerializer.includeManifest) + builder.setMessageManifest(ByteString.copyFromUtf8(m.getClass.getName)) + } + builder.build() } private def payloadFromProto(payload: dm.Payload): AnyRef = { - SerializationExtension(system).deserialize( + val manifest = if (payload.hasMessageManifest) payload.getMessageManifest.toStringUtf8 else "" + serialization.deserialize( payload.getEnclosedMessage.toByteArray, payload.getSerializerId, - if (payload.hasMessageManifest) - Some(system.dynamicAccess.getClassFor[AnyRef](payload.getMessageManifest.toStringUtf8).get) else None).get + manifest).get } } diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializerSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializerSpec.scala index 43830d7ab4..6925b67e55 100644 --- a/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializerSpec.scala +++ b/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializerSpec.scala @@ -17,7 +17,7 @@ class DistributedPubSubMessageSerializerSpec extends AkkaSpec { def checkSerialization(obj: AnyRef): Unit = { val blob = serializer.toBinary(obj) - val ref = serializer.fromBinary(blob, obj.getClass) + val ref = serializer.fromBinary(blob, serializer.manifest(obj)) ref should ===(obj) } diff --git a/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java b/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java index 8aa1518064..7696b2bb4e 100644 --- a/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java +++ b/akka-docs/rst/java/code/docs/serialization/SerializationDocTest.java @@ -3,9 +3,14 @@ */ package docs.serialization; +import java.io.UnsupportedEncodingException; + import akka.testkit.JavaTestKit; + import org.junit.Test; import static org.junit.Assert.*; +import java.nio.charset.StandardCharsets; + //#imports import akka.actor.*; import akka.serialization.*; @@ -49,6 +54,79 @@ public class SerializationDocTest { } //#my-own-serializer + static class Customer { + public final String name; + + Customer(String name) { + this.name = name; + } + } + + static class User { + public final String name; + + User(String name) { + this.name = name; + } + } + + static + //#my-own-serializer2 + public class MyOwnSerializer2 extends SerializerWithStringManifest { + + private static final String CUSTOMER_MANIFEST = "customer"; + private static final String USER_MANIFEST = "user"; + private static final String UTF_8 = StandardCharsets.UTF_8.name(); + + // Pick a unique identifier for your Serializer, + // you've got a couple of billions to choose from, + // 0 - 16 is reserved by Akka itself + @Override public int identifier() { + return 1234567; + } + + @Override public String manifest(Object obj) { + if (obj instanceof Customer) + return CUSTOMER_MANIFEST; + else if (obj instanceof User) + return USER_MANIFEST; + else + throw new IllegalArgumentException("Unknow type: " + obj); + } + + // "toBinary" serializes the given object to an Array of Bytes + @Override public byte[] toBinary(Object obj) { + // Put the real code that serializes the object here + try { + if (obj instanceof Customer) + return ((Customer) obj).name.getBytes(UTF_8); + else if (obj instanceof User) + return ((User) obj).name.getBytes(UTF_8); + else + throw new IllegalArgumentException("Unknow type: " + obj); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + // "fromBinary" deserializes the given array, + // using the type hint + @Override public Object fromBinary(byte[] bytes, String manifest) { + // Put the real code that deserializes here + try { + if (manifest.equals(CUSTOMER_MANIFEST)) + return new Customer(new String(bytes, UTF_8)); + else if (manifest.equals(USER_MANIFEST)) + return new User(new String(bytes, UTF_8)); + else + throw new IllegalArgumentException("Unknow manifest: " + manifest); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + } +//#my-own-serializer2 + @Test public void serializeActorRefs() { final ExtendedActorSystem extendedSystem = (ExtendedActorSystem) ActorSystem.create("whatever"); diff --git a/akka-docs/rst/java/serialization.rst b/akka-docs/rst/java/serialization.rst index 1706df00b1..7b60c5ee86 100644 --- a/akka-docs/rst/java/serialization.rst +++ b/akka-docs/rst/java/serialization.rst @@ -103,9 +103,38 @@ which is done by extending ``akka.serialization.JSerializer``, like this: :include: my-own-serializer :exclude: ... +The manifest is a type hint so that the same serializer can be used for different +classes. The manifest parameter in ``fromBinaryJava`` is the class of the object that +was serialized. In ``fromBinary`` you can match on the class and deserialize the +bytes to different objects. + Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then list which classes that should be serialized using it. +Serializer with String Manifest +------------------------------- + +The ``Serializer`` illustrated above supports a class based manifest (type hint). +For serialization of data that need to evolve over time the `SerializerWithStringManifest` +is recommended instead of ``Serializer`` because the manifest (type hint) is a ``String`` +instead of a ``Class``. That means that the class can be moved/removed and the serializer +can still deserialize old data by matching on the ``String``. This is especially useful +for :ref:`persistence-java`. + +The manifest string can also encode a version number that can be used in ``fromBinary`` to +deserialize in different ways to migrate old data to new domain objects. + +If the data was originally serialized with ``Serializer`` and in a later version of the +system you change to ``SerializerWithStringManifest`` the manifest string will be the full +class name if you used ``includeManifest=true``, otherwise it will be the empty string. + +This is how a ``SerializerWithStringManifest`` looks like: + +.. includecode:: code/docs/serialization/SerializationDocTest.java#my-own-serializer2 + +You must also bind it to a name in your :ref:`configuration` and then list which classes +that should be serialized using it. + Serializing ActorRefs --------------------- diff --git a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala index d308e45707..638e88d7d0 100644 --- a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala @@ -15,12 +15,13 @@ package docs.serialization { import akka.actor.ExtendedActorSystem import akka.actor.Extension import akka.actor.Address + import java.nio.charset.StandardCharsets //#my-own-serializer class MyOwnSerializer extends Serializer { // This is whether "fromBinary" requires a "clazz" or not - def includeManifest: Boolean = false + def includeManifest: Boolean = true // Pick a unique identifier for your Serializer, // you've got a couple of billions to choose from, @@ -37,7 +38,6 @@ package docs.serialization { // "fromBinary" deserializes the given array, // using the type hint (if any, see "includeManifest" above) - // into the optionally provided classLoader. def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { // Put your code that deserializes here @@ -48,8 +48,52 @@ package docs.serialization { } //#my-own-serializer + //#my-own-serializer2 + class MyOwnSerializer2 extends SerializerWithStringManifest { + + val CustomerManifest = "customer" + val UserManifest = "user" + val UTF_8 = StandardCharsets.UTF_8.name() + + // Pick a unique identifier for your Serializer, + // you've got a couple of billions to choose from, + // 0 - 16 is reserved by Akka itself + def identifier = 1234567 + + // The manifest (type hint) that will be provided in the fromBinary method + // Use `""` if manifest is not needed. + def manifest(obj: AnyRef): String = + obj match { + case _: Customer => CustomerManifest + case _: User => UserManifest + } + + // "toBinary" serializes the given object to an Array of Bytes + def toBinary(obj: AnyRef): Array[Byte] = { + // Put the real code that serializes the object here + obj match { + case Customer(name) => name.getBytes(UTF_8) + case User(name) => name.getBytes(UTF_8) + } + } + + // "fromBinary" deserializes the given array, + // using the type hint + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + // Put the real code that deserializes here + manifest match { + case CustomerManifest => + Customer(new String(bytes, UTF_8)) + case UserManifest => + User(new String(bytes, UTF_8)) + } + } + } + //#my-own-serializer2 + trait MyOwnSerializable final case class Customer(name: String) extends MyOwnSerializable + final case class User(name: String) extends MyOwnSerializable class SerializationDocSpec extends AkkaSpec { "demonstrate configuration of serialize messages" in { diff --git a/akka-docs/rst/scala/serialization.rst b/akka-docs/rst/scala/serialization.rst index 758b8c92a8..d4d1da4301 100644 --- a/akka-docs/rst/scala/serialization.rst +++ b/akka-docs/rst/scala/serialization.rst @@ -95,9 +95,38 @@ First you need to create a class definition of your ``Serializer`` like so: :include: imports,my-own-serializer :exclude: ... +The manifest is a type hint so that the same serializer can be used for different +classes. The manifest parameter in ``fromBinary`` is the class of the object that +was serialized. In ``fromBinary`` you can match on the class and deserialize the +bytes to different objects. + Then you only need to fill in the blanks, bind it to a name in your :ref:`configuration` and then list which classes that should be serialized using it. +Serializer with String Manifest +------------------------------- + +The ``Serializer`` illustrated above supports a class based manifest (type hint). +For serialization of data that need to evolve over time the `SerializerWithStringManifest` +is recommended instead of ``Serializer`` because the manifest (type hint) is a ``String`` +instead of a ``Class``. That means that the class can be moved/removed and the serializer +can still deserialize old data by matching on the ``String``. This is especially useful +for :ref:`persistence-scala`. + +The manifest string can also encode a version number that can be used in ``fromBinary`` to +deserialize in different ways to migrate old data to new domain objects. + +If the data was originally serialized with ``Serializer`` and in a later version of the +system you change to ``SerializerWithStringManifest`` the manifest string will be the full +class name if you used ``includeManifest=true``, otherwise it will be the empty string. + +This is how a ``SerializerWithStringManifest`` looks like: + +.. includecode:: code/docs/serialization/SerializationDocSpec.scala#my-own-serializer2 + +You must also bind it to a name in your :ref:`configuration` and then list which classes +that should be serialized using it. + Serializing ActorRefs --------------------- diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala index 87e1e54a10..6fbec39b0a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala @@ -7,6 +7,7 @@ package akka.persistence.journal.leveldb import org.iq80.leveldb.DBIterator import akka.actor.Actor +import akka.util.ByteString.UTF_8 /** * INTERNAL API. @@ -38,7 +39,7 @@ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore val nextEntry = iter.next() val nextKey = keyFromBytes(nextEntry.getKey) if (!isMappingKey(nextKey)) pathMap else { - val nextVal = new String(nextEntry.getValue, "UTF-8") + val nextVal = new String(nextEntry.getValue, UTF_8) readIdMap(pathMap + (nextVal -> nextKey.mappingId), iter) } } @@ -46,7 +47,7 @@ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbStore private def writeIdMapping(id: String, numericId: Int): Int = { idMap = idMap + (id -> numericId) - leveldb.put(keyToBytes(mappingKey(numericId)), id.getBytes("UTF-8")) + leveldb.put(keyToBytes(mappingKey(numericId)), id.getBytes(UTF_8)) numericId } diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 198e217baf..986ecfa259 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -30,6 +30,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer val PersistentImplClass = classOf[PersistentImpl] val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap] + private lazy val serialization = SerializationExtension(system) + override val includeManifest: Boolean = true private lazy val transportInformation: Option[Serialization.Information] = { @@ -107,10 +109,18 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer private def persistentPayloadBuilder(payload: AnyRef) = { def payloadBuilder() = { - val serializer = SerializationExtension(system).findSerializerFor(payload) + val serializer = serialization.findSerializerFor(payload) val builder = PersistentPayload.newBuilder() - if (serializer.includeManifest) builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName)) + serializer match { + case ser2: SerializerWithStringManifest ⇒ + val manifest = ser2.manifest(payload) + if (manifest != "") + builder.setPayloadManifest(ByteString.copyFromUtf8(manifest)) + case _ ⇒ + if (serializer.includeManifest) + builder.setPayloadManifest(ByteString.copyFromUtf8(payload.getClass.getName)) + } builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload))) builder.setSerializerId(serializer.identifier) @@ -138,13 +148,13 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer } private def payload(persistentPayload: PersistentPayload): Any = { - val payloadClass = if (persistentPayload.hasPayloadManifest) - Some(system.dynamicAccess.getClassFor[AnyRef](persistentPayload.getPayloadManifest.toStringUtf8).get) else None + val manifest = if (persistentPayload.hasPayloadManifest) + persistentPayload.getPayloadManifest.toStringUtf8 else "" - SerializationExtension(system).deserialize( + serialization.deserialize( persistentPayload.getPayload.toByteArray, persistentPayload.getSerializerId, - payloadClass).get + manifest).get } } diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index 0665f06688..ffda65e83f 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -8,6 +8,7 @@ package akka.persistence.serialization import java.io._ import akka.actor._ import akka.serialization._ +import akka.util.ByteString.UTF_8 import scala.util.Success import scala.util.Failure @@ -33,6 +34,8 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer override val includeManifest: Boolean = false + private lazy val serialization = SerializationExtension(system) + private lazy val transportInformation: Option[Serialization.Information] = { val address = system.provider.getDefaultAddress if (address.hasLocalScope) None @@ -57,14 +60,21 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = { def serialize() = { - val extension = SerializationExtension(system) - - val snapshotSerializer = extension.findSerializerFor(snapshot) + val snapshotSerializer = serialization.findSerializerFor(snapshot) val headerOut = new ByteArrayOutputStream writeInt(headerOut, snapshotSerializer.identifier) - if (snapshotSerializer.includeManifest) - headerOut.write(snapshot.getClass.getName.getBytes("utf-8")) + + snapshotSerializer match { + case ser2: SerializerWithStringManifest ⇒ + val manifest = ser2.manifest(snapshot) + if (manifest != "") + headerOut.write(manifest.getBytes(UTF_8)) + case _ ⇒ + if (snapshotSerializer.includeManifest) + headerOut.write(snapshot.getClass.getName.getBytes(UTF_8)) + } + val headerBytes = headerOut.toByteArray val out = new ByteArrayOutputStream @@ -84,8 +94,6 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer } private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = { - val extension = SerializationExtension(system) - val in = new ByteArrayInputStream(bytes) val headerLength = readInt(in) val headerBytes = bytes.slice(4, headerLength + 4) @@ -122,7 +130,7 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer val oldHeader = if (readShort(in) == 0xedac) { // Java Serialization magic value with swapped bytes val b = if (SnapshotSerializer.doPatch) patch(headerBytes) else headerBytes - extension.deserialize(b, classOf[SnapshotHeader]).toOption + serialization.deserialize(b, classOf[SnapshotHeader]).toOption } else None val header = oldHeader.getOrElse { @@ -134,13 +142,12 @@ class SnapshotSerializer(val system: ExtendedActorSystem) extends BaseSerializer else { val manifestBytes = Array.ofDim[Byte](remaining) headerIn.read(manifestBytes) - Some(new String(manifestBytes, "utf-8")) + Some(new String(manifestBytes, UTF_8)) } SnapshotHeader(serializerId, manifest) } - val manifest = header.manifest.map(system.dynamicAccess.getClassFor[AnyRef](_).get) - extension.deserialize[AnyRef](snapshotBytes, header.serializerId, manifest).get + serialization.deserialize(snapshotBytes, header.serializerId, header.manifest.getOrElse("")).get } private def writeInt(outputStream: OutputStream, i: Int) = @@ -205,4 +212,4 @@ object SnapshotSerializer { } else false } else false } -} \ No newline at end of file +} diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index 2f88fc4dcd..7b5c8e2792 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -17,6 +17,7 @@ import akka.persistence._ import akka.persistence.snapshot._ import akka.persistence.serialization._ import akka.serialization.SerializationExtension +import akka.util.ByteString.UTF_8 /** * INTERNAL API. @@ -102,13 +103,13 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo try { p(stream) } finally { stream.close() } private def snapshotFile(metadata: SnapshotMetadata, extension: String = ""): File = - new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}${extension}") + new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.persistenceId, UTF_8)}-${metadata.sequenceNr}-${metadata.timestamp}${extension}") private def snapshotMetadata(persistenceId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] = { val files = snapshotDir.listFiles(new SnapshotFilenameFilter(persistenceId)) if (files eq null) Nil // if the dir was removed else files.map(_.getName).collect { - case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, "UTF-8"), snr.toLong, tms.toLong) + case FilenamePattern(pid, snr, tms) ⇒ SnapshotMetadata(URLDecoder.decode(pid, UTF_8), snr.toLong, tms.toLong) }.filter(md ⇒ criteria.matches(md) && !saving.contains(md)).toVector } diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index aecadcc105..36c42869fa 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -10,24 +10,34 @@ import akka.actor._ import akka.persistence._ import akka.serialization._ import akka.testkit._ - import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery +import akka.util.ByteString.UTF_8 import scala.concurrent.Await import scala.concurrent.duration.Duration import org.apache.commons.codec.binary.Hex.decodeHex +import SerializerSpecConfigs._ + object SerializerSpecConfigs { val customSerializers = ConfigFactory.parseString( """ akka.actor { serializers { my-payload = "akka.persistence.serialization.MyPayloadSerializer" + my-payload2 = "akka.persistence.serialization.MyPayload2Serializer" my-snapshot = "akka.persistence.serialization.MySnapshotSerializer" + my-snapshot2 = "akka.persistence.serialization.MySnapshotSerializer2" + old-payload = "akka.persistence.serialization.OldPayloadSerializer" } serialization-bindings { "akka.persistence.serialization.MyPayload" = my-payload + "akka.persistence.serialization.MyPayload2" = my-payload2 "akka.persistence.serialization.MySnapshot" = my-snapshot + "akka.persistence.serialization.MySnapshot2" = my-snapshot2 + # this entry was used when creating the data for the test + # "deserialize data when class is removed" + #"akka.persistence.serialization.OldPayload" = old-payload } } """) @@ -53,6 +63,7 @@ object SerializerSpecConfigs { def config(configs: String*): Config = configs.foldLeft(ConfigFactory.empty)((r, c) ⇒ r.withFallback(ConfigFactory.parseString(c))) + } import SerializerSpecConfigs._ @@ -71,9 +82,19 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) { deserialized should ===(Snapshot(MySnapshot(".a."))) } + "handle custom snapshot Serialization with string manifest" in { + val wrapped = Snapshot(MySnapshot2("a")) + val serializer = serialization.findSerializerFor(wrapped) + + val bytes = serializer.toBinary(wrapped) + val deserialized = serializer.fromBinary(bytes, None) + + deserialized should ===(Snapshot(MySnapshot2(".a."))) + } + "be able to read snapshot created with akka 2.3.6 and Scala 2.10" in { val dataStr = "abc" - val snapshot = Snapshot(dataStr.getBytes("utf-8")) + val snapshot = Snapshot(dataStr.getBytes(UTF_8)) val serializer = serialization.findSerializerFor(snapshot) // the oldSnapshot was created with Akka 2.3.6 and it is using JavaSerialization @@ -91,13 +112,13 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) { val bytes = decodeHex(oldSnapshot.toCharArray) val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot] - val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], "utf-8") + val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], UTF_8) dataStr should ===(deserializedDataStr) } "be able to read snapshot created with akka 2.3.6 and Scala 2.11" in { val dataStr = "abc" - val snapshot = Snapshot(dataStr.getBytes("utf-8")) + val snapshot = Snapshot(dataStr.getBytes(UTF_8)) val serializer = serialization.findSerializerFor(snapshot) // the oldSnapshot was created with Akka 2.3.6 and it is using JavaSerialization @@ -115,7 +136,7 @@ class SnapshotSerializerPersistenceSpec extends AkkaSpec(customSerializers) { val bytes = decodeHex(oldSnapshot.toCharArray) val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[Snapshot] - val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], "utf-8") + val deserializedDataStr = new String(deserialized.data.asInstanceOf[Array[Byte]], UTF_8) dataStr should ===(deserializedDataStr) } } @@ -136,6 +157,7 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { deserialized should ===(persistent.withPayload(MyPayload(".a."))) } } + "given a PersistentRepr manifest" must { "handle custom Persistent message serialization" in { val persistent = PersistentRepr(MyPayload("b"), 13, "p1", true, testActor) @@ -148,6 +170,57 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { } } + "given payload serializer with string manifest" must { + "handle serialization" in { + val persistent = PersistentRepr(MyPayload2("a", 17), 13, "p1", true, testActor) + val serializer = serialization.findSerializerFor(persistent) + + val bytes = serializer.toBinary(persistent) + val deserialized = serializer.fromBinary(bytes, None) + + deserialized should ===(persistent.withPayload(MyPayload2(".a.", 17))) + } + + "be able to evolve the data types" in { + val oldEvent = MyPayload("a") + val serializer1 = serialization.findSerializerFor(oldEvent) + val bytes = serializer1.toBinary(oldEvent) + + // now the system is updated to version 2 with new class MyPayload2 + // and MyPayload2Serializer that handles migration from old MyPayload + val serializer2 = serialization.serializerFor(classOf[MyPayload2]) + val deserialized = serializer2.fromBinary(bytes, Some(oldEvent.getClass)) + + deserialized should be(MyPayload2(".a.", 0)) + } + + "be able to deserialize data when class is removed" in { + val serializer = serialization.findSerializerFor(PersistentRepr("x", 13, "p1", true, testActor)) + + // It was created with: + // val old = PersistentRepr(OldPayload('A'), 13, "p1", true, testActor) + // import org.apache.commons.codec.binary.Hex._ + // println(s"encoded OldPayload: " + String.valueOf(encodeHex(serializer.toBinary(old)))) + // + val oldData = + "0a3e08c7da04120d4f6c645061796c6f61642841291a2" + + "9616b6b612e70657273697374656e63652e7365726961" + + "6c697a6174696f6e2e4f6c645061796c6f6164100d1a0" + + "2703120015a45616b6b613a2f2f4d6573736167655365" + + "7269616c697a657250657273697374656e63655370656" + + "32f73797374656d2f746573744163746f722d31233133" + + "3137373931343033" + + // now the system is updated, OldPayload is replaced by MyPayload, and the + // OldPayloadSerializer is adjusted to migrate OldPayload + val bytes = decodeHex(oldData.toCharArray) + + val deserialized = serializer.fromBinary(bytes, None).asInstanceOf[PersistentRepr] + + deserialized.payload should be(MyPayload("OldPayload(A)")) + } + } + "given AtLeastOnceDeliverySnapshot" must { "handle empty unconfirmed" in { val unconfirmed = Vector.empty @@ -173,7 +246,6 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { deserialized should ===(snap) } - } } @@ -222,7 +294,13 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS } final case class MyPayload(data: String) +final case class MyPayload2(data: String, n: Int) final case class MySnapshot(data: String) +final case class MySnapshot2(data: String) + +// this class was used when creating the data for the test +// "deserialize data when class is removed" +//final case class OldPayload(c: Char) class MyPayloadSerializer extends Serializer { val MyPayloadClass = classOf[MyPayload] @@ -231,16 +309,41 @@ class MyPayloadSerializer extends Serializer { def includeManifest: Boolean = true def toBinary(o: AnyRef): Array[Byte] = o match { - case MyPayload(data) ⇒ s".${data}".getBytes("UTF-8") + case MyPayload(data) ⇒ s".${data}".getBytes(UTF_8) } def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match { - case Some(MyPayloadClass) ⇒ MyPayload(s"${new String(bytes, "UTF-8")}.") + case Some(MyPayloadClass) ⇒ MyPayload(s"${new String(bytes, UTF_8)}.") case Some(c) ⇒ throw new Exception(s"unexpected manifest ${c}") case None ⇒ throw new Exception("no manifest") } } +class MyPayload2Serializer extends SerializerWithStringManifest { + val MyPayload2Class = classOf[MyPayload] + + val ManifestV1 = classOf[MyPayload].getName + val ManifestV2 = "MyPayload-V2" + + def identifier: Int = 77125 + + def manifest(o: AnyRef): String = ManifestV2 + + def toBinary(o: AnyRef): Array[Byte] = o match { + case MyPayload2(data, n) ⇒ s".$data:$n".getBytes(UTF_8) + } + + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case ManifestV2 ⇒ + val parts = new String(bytes, UTF_8).split(":") + MyPayload2(data = parts(0) + ".", n = parts(1).toInt) + case ManifestV1 ⇒ + MyPayload2(data = s"${new String(bytes, UTF_8)}.", n = 0) + case other ⇒ + throw new Exception(s"unexpected manifest [$other]") + } +} + class MySnapshotSerializer extends Serializer { val MySnapshotClass = classOf[MySnapshot] @@ -248,12 +351,55 @@ class MySnapshotSerializer extends Serializer { def includeManifest: Boolean = true def toBinary(o: AnyRef): Array[Byte] = o match { - case MySnapshot(data) ⇒ s".${data}".getBytes("UTF-8") + case MySnapshot(data) ⇒ s".${data}".getBytes(UTF_8) } def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match { - case Some(MySnapshotClass) ⇒ MySnapshot(s"${new String(bytes, "UTF-8")}.") + case Some(MySnapshotClass) ⇒ MySnapshot(s"${new String(bytes, UTF_8)}.") case Some(c) ⇒ throw new Exception(s"unexpected manifest ${c}") case None ⇒ throw new Exception("no manifest") } } + +class MySnapshotSerializer2 extends SerializerWithStringManifest { + val CurrentManifest = "MySnapshot-V2" + val OldManifest = classOf[MySnapshot].getName + + def identifier: Int = 77126 + + def manifest(o: AnyRef): String = CurrentManifest + + def toBinary(o: AnyRef): Array[Byte] = o match { + case MySnapshot2(data) ⇒ s".${data}".getBytes(UTF_8) + } + + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case CurrentManifest | OldManifest ⇒ + MySnapshot2(s"${new String(bytes, UTF_8)}.") + case other ⇒ + throw new Exception(s"unexpected manifest [$other]") + } +} + +class OldPayloadSerializer extends SerializerWithStringManifest { + + def identifier: Int = 77127 + val OldPayloadClassName = "akka.persistence.serialization.OldPayload" + val MyPayloadClassName = classOf[MyPayload].getName + + def manifest(o: AnyRef): String = o.getClass.getName + + def toBinary(o: AnyRef): Array[Byte] = o match { + case MyPayload(data) ⇒ s".${data}".getBytes(UTF_8) + case old if old.getClass.getName == OldPayloadClassName ⇒ + o.toString.getBytes(UTF_8) + } + + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case OldPayloadClassName ⇒ + MyPayload(new String(bytes, UTF_8)) + case MyPayloadClassName ⇒ MyPayload(s"${new String(bytes, UTF_8)}.") + case other ⇒ + throw new Exception(s"unexpected manifest [$other]") + } +} diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 71fee373f6..2f70eed317 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -8,6 +8,7 @@ import akka.remote.WireFormats._ import com.google.protobuf.ByteString import akka.actor.ExtendedActorSystem import akka.serialization.SerializationExtension +import akka.serialization.SerializerWithStringManifest /** * INTERNAL API @@ -23,7 +24,7 @@ private[akka] object MessageSerializer { SerializationExtension(system).deserialize( messageProtocol.getMessage.toByteArray, messageProtocol.getSerializerId, - if (messageProtocol.hasMessageManifest) Some(system.dynamicAccess.getClassFor[AnyRef](messageProtocol.getMessageManifest.toStringUtf8).get) else None).get + if (messageProtocol.hasMessageManifest) messageProtocol.getMessageManifest.toStringUtf8 else "").get } /** @@ -35,8 +36,15 @@ private[akka] object MessageSerializer { val builder = SerializedMessage.newBuilder builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) builder.setSerializerId(serializer.identifier) - if (serializer.includeManifest) - builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) + serializer match { + case ser2: SerializerWithStringManifest ⇒ + val manifest = ser2.manifest(message) + if (manifest != "") + builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) + case _ ⇒ + if (serializer.includeManifest) + builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) + } builder.build } } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 1f5465d136..0066d05ab9 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -22,12 +22,13 @@ import scala.util.{ Failure, Success } import akka.remote.transport.AkkaPduCodec.Message import java.util.concurrent.ConcurrentHashMap import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } +import akka.util.ByteString.UTF_8 /** * INTERNAL API */ private[remote] object AddressUrlEncoder { - def apply(address: Address): String = URLEncoder.encode(address.toString, "utf-8") + def apply(address: Address): String = URLEncoder.encode(address.toString, UTF_8) } /** diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala index ced74c719e..fca385685a 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala @@ -14,12 +14,15 @@ import akka.actor.SelectionPathElement import akka.remote.ContainerFormats import akka.serialization.SerializationExtension import akka.serialization.BaseSerializer +import akka.serialization.SerializerWithStringManifest class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSerializer { @deprecated("Use constructor with ExtendedActorSystem", "2.4") def this() = this(null) + private lazy val serialization = SerializationExtension(system) + // TODO remove this when deprecated this() is removed override val identifier: Int = if (system eq null) 6 @@ -37,14 +40,21 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSe private def serializeSelection(sel: ActorSelectionMessage): Array[Byte] = { val builder = ContainerFormats.SelectionEnvelope.newBuilder() val message = sel.msg.asInstanceOf[AnyRef] - val serializer = SerializationExtension(system).findSerializerFor(message) + val serializer = serialization.findSerializerFor(message) builder. setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(message))). setSerializerId(serializer.identifier). setWildcardFanOut(sel.wildcardFanOut) - if (serializer.includeManifest) - builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) + serializer match { + case ser2: SerializerWithStringManifest ⇒ + val manifest = ser2.manifest(message) + if (manifest != "") + builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) + case _ ⇒ + if (serializer.includeManifest) + builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) + } sel.elements.foreach { case SelectChildName(name) ⇒ @@ -66,11 +76,11 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSe def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = { val selectionEnvelope = ContainerFormats.SelectionEnvelope.parseFrom(bytes) - val msg = SerializationExtension(system).deserialize( + val manifest = if (selectionEnvelope.hasMessageManifest) selectionEnvelope.getMessageManifest.toStringUtf8 else "" + val msg = serialization.deserialize( selectionEnvelope.getEnclosedMessage.toByteArray, selectionEnvelope.getSerializerId, - if (selectionEnvelope.hasMessageManifest) - Some(system.dynamicAccess.getClassFor[AnyRef](selectionEnvelope.getMessageManifest.toStringUtf8).get) else None).get + manifest).get import scala.collection.JavaConverters._ val elements: immutable.Iterable[SelectionPathElement] = selectionEnvelope.getPatternList.asScala.map { x ⇒