diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index d449510351..b53dc33328 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -16,8 +16,8 @@ import com.google.protobuf.Message class ProtobufSerializer extends Serializer { val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) - - def identifier = 2: Byte + def includeManifest: Boolean = false + def identifier = 2: Serializer.Identifier def toBinary(obj: AnyRef): Array[Byte] = { if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException( diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index d1764e8390..4da4f04e9a 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -14,7 +14,6 @@ import akka.actor.{ Extension, ActorSystem, ActorSystemImpl } case class NoSerializerFoundException(m: String) extends AkkaException(m) object Serialization { - // TODO ensure that these are always set (i.e. withValue()) when doing deserialization val currentSystem = new DynamicVariable[ActorSystemImpl](null) @@ -23,9 +22,8 @@ object Serialization { import scala.collection.JavaConverters._ import config._ - val Serializers: Map[String, String] = { - toStringMap(getConfig("akka.actor.serializers")) - } + val Serializers: Map[String, String] = + getConfig("akka.actor.serializers").root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } val SerializationBindings: Map[String, Seq[String]] = { val configPath = "akka.actor.serialization-bindings" @@ -40,9 +38,6 @@ object Serialization { } } - - private def toStringMap(mapConfig: Config): Map[String, String] = - mapConfig.root.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } } } @@ -55,27 +50,52 @@ class Serialization(val system: ActorSystemImpl) extends Extension { val settings = new Settings(system.settings.config) - //TODO document me + /** + * Serializes the given AnyRef/java.lang.Object according to the Serialization configuration + * to either an Array of Bytes or an Exception if one was thrown. + */ def serialize(o: AnyRef): Either[Exception, Array[Byte]] = try { Right(findSerializerFor(o).toBinary(o)) } catch { case e: Exception ⇒ Left(e) } - //TODO document me + /** + * Deserializes the given array of bytes using the specified serializer id, + * using the optional type hint to the Serializer and the optional ClassLoader ot load it into. + * Returns either the resulting object or an Exception if one was thrown. + */ + def deserialize(bytes: Array[Byte], + serializerId: Serializer.Identifier, + clazz: Option[Class[_]], + classLoader: Option[ClassLoader]): Either[Exception, AnyRef] = + try { + currentSystem.withValue(system) { + Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, classLoader)) + } + } catch { case e: Exception ⇒ Left(e) } + + /** + * Deserializes the given array of bytes using the specified type to look up what Serializer should be used. + * You can specify an optional ClassLoader to load the object into. + * Returns either the resulting object or an Exception if one was thrown. + */ def deserialize( bytes: Array[Byte], clazz: Class[_], classLoader: Option[ClassLoader]): Either[Exception, AnyRef] = try { - currentSystem.withValue(system) { - Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) - } + currentSystem.withValue(system) { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) } } catch { case e: Exception ⇒ Left(e) } + /** + * + */ def findSerializerFor(o: AnyRef): Serializer = o match { case null ⇒ NullSerializer case other ⇒ serializerFor(other.getClass) } - //TODO document me + /** + * + */ def serializerFor(clazz: Class[_]): Serializer = //TODO fall back on BestMatchClass THEN default AND memoize the lookups serializerMap.get(clazz.getName).getOrElse(serializers("default")) @@ -85,6 +105,9 @@ class Serialization(val system: ActorSystemImpl) extends Extension { def serializerOf(serializerFQN: String): Either[Exception, Serializer] = ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs) + /** + * FIXME implement support for this + */ private def serializerForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = { if (bindings.isEmpty) Left(NoSerializerFoundException("No mapping serializer found for " + cl)) diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index 893e974859..27a448ef1e 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -8,8 +8,7 @@ import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, B import akka.util.ClassLoaderObjectInputStream object Serializer { - val defaultSerializerName = classOf[JavaSerializer].getName - type Identifier = Byte + type Identifier = Int } /** @@ -17,7 +16,7 @@ object Serializer { */ trait Serializer extends scala.Serializable { /** - * Completely unique Byte value to identify this implementation of Serializer, used to optimize network traffic + * Completely unique value to identify this implementation of Serializer, used to optimize network traffic * Values from 0 to 16 is reserved for Akka internal usage */ def identifier: Serializer.Identifier @@ -27,10 +26,15 @@ trait Serializer extends scala.Serializable { */ def toBinary(o: AnyRef): Array[Byte] + /** + * Returns whether this serializer needs a manifest in the fromBinary method + */ + def includeManifest: Boolean + /** * Produces an object from an array of bytes, with an optional type-hint and a classloader to load the class into */ - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef + def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef } object JavaSerializer extends JavaSerializer @@ -38,7 +42,9 @@ object NullSerializer extends NullSerializer class JavaSerializer extends Serializer { - def identifier = 1: Byte + def includeManifest: Boolean = false + + def identifier = 1: Serializer.Identifier def toBinary(o: AnyRef): Array[Byte] = { val bos = new ByteArrayOutputStream @@ -60,10 +66,9 @@ class JavaSerializer extends Serializer { } class NullSerializer extends Serializer { - val nullAsBytes = Array[Byte]() - - def identifier = 0: Byte + def includeManifest: Boolean = false + def identifier = 0: Serializer.Identifier def toBinary(o: AnyRef) = nullAsBytes def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef = null } diff --git a/akka-docs/scala/serialization.rst b/akka-docs/scala/serialization.rst index b338e66891..3f5f30669d 100644 --- a/akka-docs/scala/serialization.rst +++ b/akka-docs/scala/serialization.rst @@ -9,7 +9,7 @@ .. contents:: :local: -Akka has a built-in Extension (TODO ADD REF) for serialization, +Akka has a built-in Extension for serialization, and it is both possible to use the built-in serializers and to write your own. The serialization mechanism is both used by Akka internally to serialize messages, diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java index 663e534bb2..6708cd18f9 100644 --- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java @@ -447,7 +447,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -1119,7 +1119,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -2115,7 +2115,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -2665,7 +2665,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -2842,7 +2842,11 @@ public final class RemoteProtocol { boolean hasMessage(); com.google.protobuf.ByteString getMessage(); - // optional bytes messageManifest = 2; + // required int32 serializerId = 2; + boolean hasSerializerId(); + int getSerializerId(); + + // optional bytes messageManifest = 3; boolean hasMessageManifest(); com.google.protobuf.ByteString getMessageManifest(); } @@ -2885,11 +2889,21 @@ public final class RemoteProtocol { return message_; } - // optional bytes messageManifest = 2; - public static final int MESSAGEMANIFEST_FIELD_NUMBER = 2; + // required int32 serializerId = 2; + public static final int SERIALIZERID_FIELD_NUMBER = 2; + private int serializerId_; + public boolean hasSerializerId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public int getSerializerId() { + return serializerId_; + } + + // optional bytes messageManifest = 3; + public static final int MESSAGEMANIFEST_FIELD_NUMBER = 3; private com.google.protobuf.ByteString messageManifest_; public boolean hasMessageManifest() { - return ((bitField0_ & 0x00000002) == 0x00000002); + return ((bitField0_ & 0x00000004) == 0x00000004); } public com.google.protobuf.ByteString getMessageManifest() { return messageManifest_; @@ -2897,6 +2911,7 @@ public final class RemoteProtocol { private void initFields() { message_ = com.google.protobuf.ByteString.EMPTY; + serializerId_ = 0; messageManifest_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; @@ -2908,6 +2923,10 @@ public final class RemoteProtocol { memoizedIsInitialized = 0; return false; } + if (!hasSerializerId()) { + memoizedIsInitialized = 0; + return false; + } memoizedIsInitialized = 1; return true; } @@ -2919,7 +2938,10 @@ public final class RemoteProtocol { output.writeBytes(1, message_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBytes(2, messageManifest_); + output.writeInt32(2, serializerId_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBytes(3, messageManifest_); } getUnknownFields().writeTo(output); } @@ -2936,7 +2958,11 @@ public final class RemoteProtocol { } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, messageManifest_); + .computeInt32Size(2, serializerId_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(3, messageManifest_); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -3048,7 +3074,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -3064,8 +3090,10 @@ public final class RemoteProtocol { super.clear(); message_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000001); - messageManifest_ = com.google.protobuf.ByteString.EMPTY; + serializerId_ = 0; bitField0_ = (bitField0_ & ~0x00000002); + messageManifest_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -3111,6 +3139,10 @@ public final class RemoteProtocol { if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } + result.serializerId_ = serializerId_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } result.messageManifest_ = messageManifest_; result.bitField0_ = to_bitField0_; onBuilt(); @@ -3131,6 +3163,9 @@ public final class RemoteProtocol { if (other.hasMessage()) { setMessage(other.getMessage()); } + if (other.hasSerializerId()) { + setSerializerId(other.getSerializerId()); + } if (other.hasMessageManifest()) { setMessageManifest(other.getMessageManifest()); } @@ -3143,6 +3178,10 @@ public final class RemoteProtocol { return false; } + if (!hasSerializerId()) { + + return false; + } return true; } @@ -3174,8 +3213,13 @@ public final class RemoteProtocol { message_ = input.readBytes(); break; } - case 18: { + case 16: { bitField0_ |= 0x00000002; + serializerId_ = input.readInt32(); + break; + } + case 26: { + bitField0_ |= 0x00000004; messageManifest_ = input.readBytes(); break; } @@ -3209,10 +3253,31 @@ public final class RemoteProtocol { return this; } - // optional bytes messageManifest = 2; + // required int32 serializerId = 2; + private int serializerId_ ; + public boolean hasSerializerId() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public int getSerializerId() { + return serializerId_; + } + public Builder setSerializerId(int value) { + bitField0_ |= 0x00000002; + serializerId_ = value; + onChanged(); + return this; + } + public Builder clearSerializerId() { + bitField0_ = (bitField0_ & ~0x00000002); + serializerId_ = 0; + onChanged(); + return this; + } + + // optional bytes messageManifest = 3; private com.google.protobuf.ByteString messageManifest_ = com.google.protobuf.ByteString.EMPTY; public boolean hasMessageManifest() { - return ((bitField0_ & 0x00000002) == 0x00000002); + return ((bitField0_ & 0x00000004) == 0x00000004); } public com.google.protobuf.ByteString getMessageManifest() { return messageManifest_; @@ -3221,13 +3286,13 @@ public final class RemoteProtocol { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000002; + bitField0_ |= 0x00000004; messageManifest_ = value; onChanged(); return this; } public Builder clearMessageManifest() { - bitField0_ = (bitField0_ & ~0x00000002); + bitField0_ = (bitField0_ & ~0x00000004); messageManifest_ = getDefaultInstance().getMessageManifest(); onChanged(); return this; @@ -3483,7 +3548,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -3982,7 +4047,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -4506,7 +4571,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -4987,7 +5052,7 @@ public final class RemoteProtocol { maybeForceBuilderInitialization(); } - private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) { + private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } @@ -5447,32 +5512,32 @@ public final class RemoteProtocol { descriptor; static { java.lang.String[] descriptorData = { - "\n\035protocol/RemoteProtocol.proto\"j\n\022AkkaR" + - "emoteProtocol\022\'\n\007message\030\001 \001(\0132\026.RemoteM" + - "essageProtocol\022+\n\013instruction\030\002 \001(\0132\026.Re" + - "moteControlProtocol\"\255\001\n\025RemoteMessagePro" + - "tocol\022$\n\trecipient\030\001 \002(\0132\021.ActorRefProto" + - "col\022!\n\007message\030\002 \002(\0132\020.MessageProtocol\022!" + - "\n\006sender\030\004 \001(\0132\021.ActorRefProtocol\022(\n\010met" + - "adata\030\005 \003(\0132\026.MetadataEntryProtocol\"l\n\025R" + - "emoteControlProtocol\022!\n\013commandType\030\001 \002(" + - "\0162\014.CommandType\022\016\n\006cookie\030\002 \001(\t\022 \n\006origi", - "n\030\003 \001(\0132\020.AddressProtocol\" \n\020ActorRefPro" + - "tocol\022\014\n\004path\030\001 \002(\t\";\n\017MessageProtocol\022\017" + - "\n\007message\030\001 \002(\014\022\027\n\017messageManifest\030\002 \001(\014" + - "\"3\n\025MetadataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r" + - "\n\005value\030\002 \002(\014\"A\n\017AddressProtocol\022\016\n\006syst" + - "em\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004port\030\003 \002(\r" + - "\"7\n\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(\t" + - "\022\017\n\007message\030\002 \002(\t\"y\n\035DurableMailboxMessa" + - "geProtocol\022$\n\trecipient\030\001 \002(\0132\021.ActorRef" + - "Protocol\022!\n\006sender\030\002 \001(\0132\021.ActorRefProto", - "col\022\017\n\007message\030\003 \002(\014*(\n\013CommandType\022\013\n\007C" + - "ONNECT\020\001\022\014\n\010SHUTDOWN\020\002*K\n\026ReplicationSto" + - "rageType\022\r\n\tTRANSIENT\020\001\022\023\n\017TRANSACTION_L" + - "OG\020\002\022\r\n\tDATA_GRID\020\003*>\n\027ReplicationStrate" + - "gyType\022\021\n\rWRITE_THROUGH\020\001\022\020\n\014WRITE_BEHIN" + - "D\020\002B\017\n\013akka.remoteH\001" + "\n\024RemoteProtocol.proto\"j\n\022AkkaRemoteProt" + + "ocol\022\'\n\007message\030\001 \001(\0132\026.RemoteMessagePro" + + "tocol\022+\n\013instruction\030\002 \001(\0132\026.RemoteContr" + + "olProtocol\"\255\001\n\025RemoteMessageProtocol\022$\n\t" + + "recipient\030\001 \002(\0132\021.ActorRefProtocol\022!\n\007me" + + "ssage\030\002 \002(\0132\020.MessageProtocol\022!\n\006sender\030" + + "\004 \001(\0132\021.ActorRefProtocol\022(\n\010metadata\030\005 \003" + + "(\0132\026.MetadataEntryProtocol\"l\n\025RemoteCont" + + "rolProtocol\022!\n\013commandType\030\001 \002(\0162\014.Comma" + + "ndType\022\016\n\006cookie\030\002 \001(\t\022 \n\006origin\030\003 \001(\0132\020", + ".AddressProtocol\" \n\020ActorRefProtocol\022\014\n\004" + + "path\030\001 \002(\t\"Q\n\017MessageProtocol\022\017\n\007message" + + "\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messageM" + + "anifest\030\003 \001(\014\"3\n\025MetadataEntryProtocol\022\013" + + "\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"A\n\017AddressPro" + + "tocol\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022" + + "\014\n\004port\030\003 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tcl" + + "assname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t\"y\n\035Durabl" + + "eMailboxMessageProtocol\022$\n\trecipient\030\001 \002" + + "(\0132\021.ActorRefProtocol\022!\n\006sender\030\002 \001(\0132\021.", + "ActorRefProtocol\022\017\n\007message\030\003 \002(\014*(\n\013Com" + + "mandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002*K\n\026R" + + "eplicationStorageType\022\r\n\tTRANSIENT\020\001\022\023\n\017" + + "TRANSACTION_LOG\020\002\022\r\n\tDATA_GRID\020\003*>\n\027Repl" + + "icationStrategyType\022\021\n\rWRITE_THROUGH\020\001\022\020" + + "\n\014WRITE_BEHIND\020\002B\017\n\013akka.remoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5516,7 +5581,7 @@ public final class RemoteProtocol { internal_static_MessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MessageProtocol_descriptor, - new java.lang.String[] { "Message", "MessageManifest", }, + new java.lang.String[] { "Message", "SerializerId", "MessageManifest", }, akka.remote.RemoteProtocol.MessageProtocol.class, akka.remote.RemoteProtocol.MessageProtocol.Builder.class); internal_static_MetadataEntryProtocol_descriptor = diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 3f54b5a633..557d4376a1 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -73,7 +73,8 @@ message ActorRefProtocol { */ message MessageProtocol { required bytes message = 1; - optional bytes messageManifest = 2; + required int32 serializerId = 2; + optional bytes messageManifest = 3; } /** diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 03e10e770b..56c8ec2ed8 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -5,29 +5,40 @@ package akka.remote import akka.remote.RemoteProtocol._ -import akka.serialization.Serialization import com.google.protobuf.ByteString import akka.actor.ActorSystem import akka.serialization.SerializationExtension +import akka.util.ReflectiveAccess object MessageSerializer { def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = { - val clazz = loadManifest(classLoader, messageProtocol) - SerializationExtension(system).deserialize(messageProtocol.getMessage.toByteArray, - clazz, classLoader).fold(x ⇒ throw x, identity) + val clazz = if (messageProtocol.hasMessageManifest) { + Option(ReflectiveAccess.getClassFor[AnyRef]( + messageProtocol.getMessageManifest.toStringUtf8, + classLoader.getOrElse(ReflectiveAccess.loader)) match { + case Left(e) ⇒ throw e + case Right(r) ⇒ r + }) + } else None + SerializationExtension(system).deserialize( + messageProtocol.getMessage.toByteArray, + messageProtocol.getSerializerId, + clazz, + classLoader) match { + case Left(e) ⇒ throw e + case Right(r) ⇒ r + } } def serialize(system: ActorSystem, message: AnyRef): MessageProtocol = { + val s = SerializationExtension(system) + val serializer = s.findSerializerFor(message) val builder = MessageProtocol.newBuilder - val bytes = SerializationExtension(system).serialize(message).fold(x ⇒ throw x, identity) - builder.setMessage(ByteString.copyFrom(bytes)) - builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) + builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) + builder.setSerializerId(serializer.identifier) + if (serializer.includeManifest) + builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) builder.build } - - private def loadManifest(classLoader: Option[ClassLoader], messageProtocol: MessageProtocol): Class[_] = { - val manifest = messageProtocol.getMessageManifest.toStringUtf8 - classLoader map (_.loadClass(manifest)) getOrElse (Class.forName(manifest)) - } } diff --git a/akka-remote/src/main/scala/akka/serialization/ProtobufSerializer.scala b/akka-remote/src/main/scala/akka/serialization/ProtobufSerializer.scala index ae908bdd48..9a0da2d1ff 100644 --- a/akka-remote/src/main/scala/akka/serialization/ProtobufSerializer.scala +++ b/akka-remote/src/main/scala/akka/serialization/ProtobufSerializer.scala @@ -9,8 +9,8 @@ import com.google.protobuf.Message class ProtobufSerializer extends Serializer { val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) - - def identifier = 2: Byte + def includeManifest: Boolean = false + def identifier = 2: Serializer.Identifier def toBinary(obj: AnyRef): Array[Byte] = { if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException(