This commit is contained in:
Viktor Klang 2011-12-29 16:11:56 +01:00
parent 840cacfd2a
commit cffe60bf43
8 changed files with 192 additions and 87 deletions

View file

@ -16,8 +16,8 @@ import com.google.protobuf.Message
class ProtobufSerializer extends Serializer {
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
def identifier = 2: Byte
def includeManifest: Boolean = false
def identifier = 2: Serializer.Identifier
def toBinary(obj: AnyRef): Array[Byte] = {
if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException(

View file

@ -14,7 +14,6 @@ import akka.actor.{ Extension, ActorSystem, ActorSystemImpl }
case class NoSerializerFoundException(m: String) extends AkkaException(m)
object Serialization {
// TODO ensure that these are always set (i.e. withValue()) when doing deserialization
val currentSystem = new DynamicVariable[ActorSystemImpl](null)
@ -23,9 +22,8 @@ object Serialization {
import scala.collection.JavaConverters._
import config._
val Serializers: Map[String, String] = {
toStringMap(getConfig("akka.actor.serializers"))
}
val Serializers: Map[String, String] =
getConfig("akka.actor.serializers").root.unwrapped.asScala.toMap.map { case (k, v) (k, v.toString) }
val SerializationBindings: Map[String, Seq[String]] = {
val configPath = "akka.actor.serialization-bindings"
@ -40,9 +38,6 @@ object Serialization {
}
}
private def toStringMap(mapConfig: Config): Map[String, String] =
mapConfig.root.unwrapped.asScala.toMap.map { case (k, v) (k, v.toString) }
}
}
@ -55,27 +50,52 @@ class Serialization(val system: ActorSystemImpl) extends Extension {
val settings = new Settings(system.settings.config)
//TODO document me
/**
* Serializes the given AnyRef/java.lang.Object according to the Serialization configuration
* to either an Array of Bytes or an Exception if one was thrown.
*/
def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
try { Right(findSerializerFor(o).toBinary(o)) } catch { case e: Exception Left(e) }
//TODO document me
/**
* Deserializes the given array of bytes using the specified serializer id,
* using the optional type hint to the Serializer and the optional ClassLoader ot load it into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize(bytes: Array[Byte],
serializerId: Serializer.Identifier,
clazz: Option[Class[_]],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
try {
currentSystem.withValue(system) {
Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, classLoader))
}
} catch { case e: Exception Left(e) }
/**
* Deserializes the given array of bytes using the specified type to look up what Serializer should be used.
* You can specify an optional ClassLoader to load the object into.
* Returns either the resulting object or an Exception if one was thrown.
*/
def deserialize(
bytes: Array[Byte],
clazz: Class[_],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
try {
currentSystem.withValue(system) {
Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader))
}
currentSystem.withValue(system) { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) }
} catch { case e: Exception Left(e) }
/**
*
*/
def findSerializerFor(o: AnyRef): Serializer = o match {
case null NullSerializer
case other serializerFor(other.getClass)
}
//TODO document me
/**
*
*/
def serializerFor(clazz: Class[_]): Serializer = //TODO fall back on BestMatchClass THEN default AND memoize the lookups
serializerMap.get(clazz.getName).getOrElse(serializers("default"))
@ -85,6 +105,9 @@ class Serialization(val system: ActorSystemImpl) extends Extension {
def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs)
/**
* FIXME implement support for this
*/
private def serializerForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = {
if (bindings.isEmpty)
Left(NoSerializerFoundException("No mapping serializer found for " + cl))

View file

@ -8,8 +8,7 @@ import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, B
import akka.util.ClassLoaderObjectInputStream
object Serializer {
val defaultSerializerName = classOf[JavaSerializer].getName
type Identifier = Byte
type Identifier = Int
}
/**
@ -17,7 +16,7 @@ object Serializer {
*/
trait Serializer extends scala.Serializable {
/**
* Completely unique Byte value to identify this implementation of Serializer, used to optimize network traffic
* Completely unique value to identify this implementation of Serializer, used to optimize network traffic
* Values from 0 to 16 is reserved for Akka internal usage
*/
def identifier: Serializer.Identifier
@ -27,10 +26,15 @@ trait Serializer extends scala.Serializable {
*/
def toBinary(o: AnyRef): Array[Byte]
/**
* Returns whether this serializer needs a manifest in the fromBinary method
*/
def includeManifest: Boolean
/**
* Produces an object from an array of bytes, with an optional type-hint and a classloader to load the class into
*/
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef
}
object JavaSerializer extends JavaSerializer
@ -38,7 +42,9 @@ object NullSerializer extends NullSerializer
class JavaSerializer extends Serializer {
def identifier = 1: Byte
def includeManifest: Boolean = false
def identifier = 1: Serializer.Identifier
def toBinary(o: AnyRef): Array[Byte] = {
val bos = new ByteArrayOutputStream
@ -60,10 +66,9 @@ class JavaSerializer extends Serializer {
}
class NullSerializer extends Serializer {
val nullAsBytes = Array[Byte]()
def identifier = 0: Byte
def includeManifest: Boolean = false
def identifier = 0: Serializer.Identifier
def toBinary(o: AnyRef) = nullAsBytes
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef = null
}

View file

@ -9,7 +9,7 @@
.. contents:: :local:
Akka has a built-in Extension (TODO ADD REF) for serialization,
Akka has a built-in Extension for serialization,
and it is both possible to use the built-in serializers and to write your own.
The serialization mechanism is both used by Akka internally to serialize messages,

View file

@ -447,7 +447,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -1119,7 +1119,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -2115,7 +2115,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -2665,7 +2665,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -2842,7 +2842,11 @@ public final class RemoteProtocol {
boolean hasMessage();
com.google.protobuf.ByteString getMessage();
// optional bytes messageManifest = 2;
// required int32 serializerId = 2;
boolean hasSerializerId();
int getSerializerId();
// optional bytes messageManifest = 3;
boolean hasMessageManifest();
com.google.protobuf.ByteString getMessageManifest();
}
@ -2885,11 +2889,21 @@ public final class RemoteProtocol {
return message_;
}
// optional bytes messageManifest = 2;
public static final int MESSAGEMANIFEST_FIELD_NUMBER = 2;
// required int32 serializerId = 2;
public static final int SERIALIZERID_FIELD_NUMBER = 2;
private int serializerId_;
public boolean hasSerializerId() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public int getSerializerId() {
return serializerId_;
}
// optional bytes messageManifest = 3;
public static final int MESSAGEMANIFEST_FIELD_NUMBER = 3;
private com.google.protobuf.ByteString messageManifest_;
public boolean hasMessageManifest() {
return ((bitField0_ & 0x00000002) == 0x00000002);
return ((bitField0_ & 0x00000004) == 0x00000004);
}
public com.google.protobuf.ByteString getMessageManifest() {
return messageManifest_;
@ -2897,6 +2911,7 @@ public final class RemoteProtocol {
private void initFields() {
message_ = com.google.protobuf.ByteString.EMPTY;
serializerId_ = 0;
messageManifest_ = com.google.protobuf.ByteString.EMPTY;
}
private byte memoizedIsInitialized = -1;
@ -2908,6 +2923,10 @@ public final class RemoteProtocol {
memoizedIsInitialized = 0;
return false;
}
if (!hasSerializerId()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
@ -2919,7 +2938,10 @@ public final class RemoteProtocol {
output.writeBytes(1, message_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBytes(2, messageManifest_);
output.writeInt32(2, serializerId_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, messageManifest_);
}
getUnknownFields().writeTo(output);
}
@ -2936,7 +2958,11 @@ public final class RemoteProtocol {
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(2, messageManifest_);
.computeInt32Size(2, serializerId_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(3, messageManifest_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
@ -3048,7 +3074,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -3064,8 +3090,10 @@ public final class RemoteProtocol {
super.clear();
message_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000001);
messageManifest_ = com.google.protobuf.ByteString.EMPTY;
serializerId_ = 0;
bitField0_ = (bitField0_ & ~0x00000002);
messageManifest_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@ -3111,6 +3139,10 @@ public final class RemoteProtocol {
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
result.serializerId_ = serializerId_;
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.messageManifest_ = messageManifest_;
result.bitField0_ = to_bitField0_;
onBuilt();
@ -3131,6 +3163,9 @@ public final class RemoteProtocol {
if (other.hasMessage()) {
setMessage(other.getMessage());
}
if (other.hasSerializerId()) {
setSerializerId(other.getSerializerId());
}
if (other.hasMessageManifest()) {
setMessageManifest(other.getMessageManifest());
}
@ -3143,6 +3178,10 @@ public final class RemoteProtocol {
return false;
}
if (!hasSerializerId()) {
return false;
}
return true;
}
@ -3174,8 +3213,13 @@ public final class RemoteProtocol {
message_ = input.readBytes();
break;
}
case 18: {
case 16: {
bitField0_ |= 0x00000002;
serializerId_ = input.readInt32();
break;
}
case 26: {
bitField0_ |= 0x00000004;
messageManifest_ = input.readBytes();
break;
}
@ -3209,10 +3253,31 @@ public final class RemoteProtocol {
return this;
}
// optional bytes messageManifest = 2;
// required int32 serializerId = 2;
private int serializerId_ ;
public boolean hasSerializerId() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public int getSerializerId() {
return serializerId_;
}
public Builder setSerializerId(int value) {
bitField0_ |= 0x00000002;
serializerId_ = value;
onChanged();
return this;
}
public Builder clearSerializerId() {
bitField0_ = (bitField0_ & ~0x00000002);
serializerId_ = 0;
onChanged();
return this;
}
// optional bytes messageManifest = 3;
private com.google.protobuf.ByteString messageManifest_ = com.google.protobuf.ByteString.EMPTY;
public boolean hasMessageManifest() {
return ((bitField0_ & 0x00000002) == 0x00000002);
return ((bitField0_ & 0x00000004) == 0x00000004);
}
public com.google.protobuf.ByteString getMessageManifest() {
return messageManifest_;
@ -3221,13 +3286,13 @@ public final class RemoteProtocol {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000002;
bitField0_ |= 0x00000004;
messageManifest_ = value;
onChanged();
return this;
}
public Builder clearMessageManifest() {
bitField0_ = (bitField0_ & ~0x00000002);
bitField0_ = (bitField0_ & ~0x00000004);
messageManifest_ = getDefaultInstance().getMessageManifest();
onChanged();
return this;
@ -3483,7 +3548,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -3982,7 +4047,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -4506,7 +4571,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -4987,7 +5052,7 @@ public final class RemoteProtocol {
maybeForceBuilderInitialization();
}
private Builder(com.google.protobuf.GeneratedMessage.BuilderParent parent) {
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
@ -5447,32 +5512,32 @@ public final class RemoteProtocol {
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\035protocol/RemoteProtocol.proto\"j\n\022AkkaR" +
"emoteProtocol\022\'\n\007message\030\001 \001(\0132\026.RemoteM" +
"essageProtocol\022+\n\013instruction\030\002 \001(\0132\026.Re" +
"moteControlProtocol\"\255\001\n\025RemoteMessagePro" +
"tocol\022$\n\trecipient\030\001 \002(\0132\021.ActorRefProto" +
"col\022!\n\007message\030\002 \002(\0132\020.MessageProtocol\022!" +
"\n\006sender\030\004 \001(\0132\021.ActorRefProtocol\022(\n\010met" +
"adata\030\005 \003(\0132\026.MetadataEntryProtocol\"l\n\025R" +
"emoteControlProtocol\022!\n\013commandType\030\001 \002(" +
"\0162\014.CommandType\022\016\n\006cookie\030\002 \001(\t\022 \n\006origi",
"n\030\003 \001(\0132\020.AddressProtocol\" \n\020ActorRefPro" +
"tocol\022\014\n\004path\030\001 \002(\t\";\n\017MessageProtocol\022\017" +
"\n\007message\030\001 \002(\014\022\027\n\017messageManifest\030\002 \001(\014" +
"\"3\n\025MetadataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r" +
"\n\005value\030\002 \002(\014\"A\n\017AddressProtocol\022\016\n\006syst" +
"em\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004port\030\003 \002(\r" +
"\"7\n\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(\t" +
"\022\017\n\007message\030\002 \002(\t\"y\n\035DurableMailboxMessa" +
"geProtocol\022$\n\trecipient\030\001 \002(\0132\021.ActorRef" +
"Protocol\022!\n\006sender\030\002 \001(\0132\021.ActorRefProto",
"col\022\017\n\007message\030\003 \002(\014*(\n\013CommandType\022\013\n\007C" +
"ONNECT\020\001\022\014\n\010SHUTDOWN\020\002*K\n\026ReplicationSto" +
"rageType\022\r\n\tTRANSIENT\020\001\022\023\n\017TRANSACTION_L" +
"OG\020\002\022\r\n\tDATA_GRID\020\003*>\n\027ReplicationStrate" +
"gyType\022\021\n\rWRITE_THROUGH\020\001\022\020\n\014WRITE_BEHIN" +
"D\020\002B\017\n\013akka.remoteH\001"
"\n\024RemoteProtocol.proto\"j\n\022AkkaRemoteProt" +
"ocol\022\'\n\007message\030\001 \001(\0132\026.RemoteMessagePro" +
"tocol\022+\n\013instruction\030\002 \001(\0132\026.RemoteContr" +
"olProtocol\"\255\001\n\025RemoteMessageProtocol\022$\n\t" +
"recipient\030\001 \002(\0132\021.ActorRefProtocol\022!\n\007me" +
"ssage\030\002 \002(\0132\020.MessageProtocol\022!\n\006sender\030" +
"\004 \001(\0132\021.ActorRefProtocol\022(\n\010metadata\030\005 \003" +
"(\0132\026.MetadataEntryProtocol\"l\n\025RemoteCont" +
"rolProtocol\022!\n\013commandType\030\001 \002(\0162\014.Comma" +
"ndType\022\016\n\006cookie\030\002 \001(\t\022 \n\006origin\030\003 \001(\0132\020",
".AddressProtocol\" \n\020ActorRefProtocol\022\014\n\004" +
"path\030\001 \002(\t\"Q\n\017MessageProtocol\022\017\n\007message" +
"\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messageM" +
"anifest\030\003 \001(\014\"3\n\025MetadataEntryProtocol\022\013" +
"\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"A\n\017AddressPro" +
"tocol\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022" +
"\014\n\004port\030\003 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tcl" +
"assname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t\"y\n\035Durabl" +
"eMailboxMessageProtocol\022$\n\trecipient\030\001 \002" +
"(\0132\021.ActorRefProtocol\022!\n\006sender\030\002 \001(\0132\021.",
"ActorRefProtocol\022\017\n\007message\030\003 \002(\014*(\n\013Com" +
"mandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002*K\n\026R" +
"eplicationStorageType\022\r\n\tTRANSIENT\020\001\022\023\n\017" +
"TRANSACTION_LOG\020\002\022\r\n\tDATA_GRID\020\003*>\n\027Repl" +
"icationStrategyType\022\021\n\rWRITE_THROUGH\020\001\022\020" +
"\n\014WRITE_BEHIND\020\002B\017\n\013akka.remoteH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -5516,7 +5581,7 @@ public final class RemoteProtocol {
internal_static_MessageProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MessageProtocol_descriptor,
new java.lang.String[] { "Message", "MessageManifest", },
new java.lang.String[] { "Message", "SerializerId", "MessageManifest", },
akka.remote.RemoteProtocol.MessageProtocol.class,
akka.remote.RemoteProtocol.MessageProtocol.Builder.class);
internal_static_MetadataEntryProtocol_descriptor =

View file

@ -73,7 +73,8 @@ message ActorRefProtocol {
*/
message MessageProtocol {
required bytes message = 1;
optional bytes messageManifest = 2;
required int32 serializerId = 2;
optional bytes messageManifest = 3;
}
/**

View file

@ -5,29 +5,40 @@
package akka.remote
import akka.remote.RemoteProtocol._
import akka.serialization.Serialization
import com.google.protobuf.ByteString
import akka.actor.ActorSystem
import akka.serialization.SerializationExtension
import akka.util.ReflectiveAccess
object MessageSerializer {
def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = {
val clazz = loadManifest(classLoader, messageProtocol)
SerializationExtension(system).deserialize(messageProtocol.getMessage.toByteArray,
clazz, classLoader).fold(x throw x, identity)
val clazz = if (messageProtocol.hasMessageManifest) {
Option(ReflectiveAccess.getClassFor[AnyRef](
messageProtocol.getMessageManifest.toStringUtf8,
classLoader.getOrElse(ReflectiveAccess.loader)) match {
case Left(e) throw e
case Right(r) r
})
} else None
SerializationExtension(system).deserialize(
messageProtocol.getMessage.toByteArray,
messageProtocol.getSerializerId,
clazz,
classLoader) match {
case Left(e) throw e
case Right(r) r
}
}
def serialize(system: ActorSystem, message: AnyRef): MessageProtocol = {
val s = SerializationExtension(system)
val serializer = s.findSerializerFor(message)
val builder = MessageProtocol.newBuilder
val bytes = SerializationExtension(system).serialize(message).fold(x throw x, identity)
builder.setMessage(ByteString.copyFrom(bytes))
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
builder.setSerializerId(serializer.identifier)
if (serializer.includeManifest)
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
builder.build
}
private def loadManifest(classLoader: Option[ClassLoader], messageProtocol: MessageProtocol): Class[_] = {
val manifest = messageProtocol.getMessageManifest.toStringUtf8
classLoader map (_.loadClass(manifest)) getOrElse (Class.forName(manifest))
}
}

View file

@ -9,8 +9,8 @@ import com.google.protobuf.Message
class ProtobufSerializer extends Serializer {
val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]])
def identifier = 2: Byte
def includeManifest: Boolean = false
def identifier = 2: Serializer.Identifier
def toBinary(obj: AnyRef): Array[Byte] = {
if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException(