Fixed remote actor protobuf message serialization problem + added tests

This commit is contained in:
Jonas Bonér 2010-05-05 12:13:25 +02:00
parent 3f38822afb
commit f168a36090
5 changed files with 517 additions and 13 deletions

View file

@ -17,7 +17,6 @@ object RemoteProtocolBuilder {
private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
def setClassLoader(cl: ClassLoader) = {
SERIALIZER_JAVA.classLoader = Some(cl)
SERIALIZER_JAVA_JSON.classLoader = Some(cl)
@ -26,6 +25,8 @@ object RemoteProtocolBuilder {
def getMessage(request: RemoteRequest): Any = {
request.getProtocol match {
case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None))
case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(request.getMessage.toByteArray)
@ -36,17 +37,19 @@ object RemoteProtocolBuilder {
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
val protobufMessage = messageClass.newInstance.asInstanceOf[Serializable.Protobuf[_]]
protobufMessage.fromBytes(request.getMessage.toByteArray)
case SerializationProtocol.PROTOBUF_RAW =>
val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.in(request.getMessage.toByteArray, Some(messageClass))
case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None))
case SerializationProtocol.AVRO =>
throw new UnsupportedOperationException("Avro protocol is not yet supported")
}
}
def getMessage(reply: RemoteReply): Any = {
reply.getProtocol match {
case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None))
case SerializationProtocol.SBINARY =>
val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
renderer.fromBytes(reply.getMessage.toByteArray)
@ -57,12 +60,12 @@ object RemoteProtocolBuilder {
val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationProtocol.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
val protobufMessage = messageClass.newInstance.asInstanceOf[Serializable.Protobuf[_]]
protobufMessage.fromBytes(reply.getMessage.toByteArray)
case SerializationProtocol.PROTOBUF_RAW =>
val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.in(reply.getMessage.toByteArray, Some(messageClass))
case SerializationProtocol.JAVA =>
unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None))
case SerializationProtocol.AVRO =>
throw new UnsupportedOperationException("Avro protocol is not yet supported")
}
}
@ -72,9 +75,14 @@ object RemoteProtocolBuilder {
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.Protobuf[_]]) {
val serializable = message.asInstanceOf[Serializable.Protobuf[_]]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.getMessage.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass)))
} else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setProtocol(SerializationProtocol.PROTOBUF_RAW)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass)))
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
@ -100,9 +108,14 @@ object RemoteProtocolBuilder {
builder.setProtocol(SerializationProtocol.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.Protobuf[_]]) {
val serializable = message.asInstanceOf[Serializable.Protobuf[_]]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.getMessage.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass)))
} else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setProtocol(SerializationProtocol.PROTOBUF)
builder.setProtocol(SerializationProtocol.PROTOBUF_RAW)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass)))
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {

View file

@ -16,12 +16,12 @@ import java.io.{StringWriter, ByteArrayOutputStream, ObjectOutputStream}
import sjson.json.{Serializer=>SJSONSerializer}
object SerializationProtocol {
val JAVA = 0
val SBINARY = 1
val SCALA_JSON = 2
val JAVA_JSON = 3
val PROTOBUF = 4
val JAVA = 5
val AVRO = 6
val PROTOBUF_RAW = 5
}
/**

View file

@ -0,0 +1,17 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor;
/*
Compile with:
cd ./akka-core/src/test/java
protoc ProtobufProtocol.proto --java_out .
*/
message ProtobufPOJO {
required uint64 id = 1;
required string name = 2;
required bool status = 3;
}

View file

@ -0,0 +1,402 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
package se.scalablesolutions.akka.actor;
public final class ProtobufProtocol {
private ProtobufProtocol() {}
public static void registerAllExtensions(
com.google.protobuf.ExtensionRegistry registry) {
}
public static final class ProtobufPOJO extends
com.google.protobuf.GeneratedMessage {
// Use ProtobufPOJO.newBuilder() to construct.
private ProtobufPOJO() {}
private static final ProtobufPOJO defaultInstance = new ProtobufPOJO();
public static ProtobufPOJO getDefaultInstance() {
return defaultInstance;
}
public ProtobufPOJO getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable;
}
// required uint64 id = 1;
public static final int ID_FIELD_NUMBER = 1;
private boolean hasId;
private long id_ = 0L;
public boolean hasId() { return hasId; }
public long getId() { return id_; }
// required string name = 2;
public static final int NAME_FIELD_NUMBER = 2;
private boolean hasName;
private java.lang.String name_ = "";
public boolean hasName() { return hasName; }
public java.lang.String getName() { return name_; }
// required bool status = 3;
public static final int STATUS_FIELD_NUMBER = 3;
private boolean hasStatus;
private boolean status_ = false;
public boolean hasStatus() { return hasStatus; }
public boolean getStatus() { return status_; }
public final boolean isInitialized() {
if (!hasId) return false;
if (!hasName) return false;
if (!hasStatus) return false;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
if (hasId()) {
output.writeUInt64(1, getId());
}
if (hasName()) {
output.writeString(2, getName());
}
if (hasStatus()) {
output.writeBool(3, getStatus());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (hasId()) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(1, getId());
}
if (hasName()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(2, getName());
}
if (hasStatus()) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(3, getStatus());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeDelimitedFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeDelimitedFrom(input, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder> {
private se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO result;
// Construct using se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.newBuilder()
private Builder() {}
private static Builder create() {
Builder builder = new Builder();
builder.result = new se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO();
return builder;
}
protected se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO internalGetResult() {
return result;
}
public Builder clear() {
if (result == null) {
throw new IllegalStateException(
"Cannot call clear() after build().");
}
result = new se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO();
return this;
}
public Builder clone() {
return create().mergeFrom(result);
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDescriptor();
}
public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO getDefaultInstanceForType() {
return se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDefaultInstance();
}
public boolean isInitialized() {
return result.isInitialized();
}
public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO build() {
if (result != null && !isInitialized()) {
throw newUninitializedMessageException(result);
}
return buildPartial();
}
private se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
if (!isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return buildPartial();
}
public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO buildPartial() {
if (result == null) {
throw new IllegalStateException(
"build() has already been called on this Builder.");
}
se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO returnMe = result;
result = null;
return returnMe;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO) {
return mergeFrom((se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO other) {
if (other == se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDefaultInstance()) return this;
if (other.hasId()) {
setId(other.getId());
}
if (other.hasName()) {
setName(other.getName());
}
if (other.hasStatus()) {
setStatus(other.getStatus());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
return this;
}
break;
}
case 8: {
setId(input.readUInt64());
break;
}
case 18: {
setName(input.readString());
break;
}
case 24: {
setStatus(input.readBool());
break;
}
}
}
}
// required uint64 id = 1;
public boolean hasId() {
return result.hasId();
}
public long getId() {
return result.getId();
}
public Builder setId(long value) {
result.hasId = true;
result.id_ = value;
return this;
}
public Builder clearId() {
result.hasId = false;
result.id_ = 0L;
return this;
}
// required string name = 2;
public boolean hasName() {
return result.hasName();
}
public java.lang.String getName() {
return result.getName();
}
public Builder setName(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
result.hasName = true;
result.name_ = value;
return this;
}
public Builder clearName() {
result.hasName = false;
result.name_ = getDefaultInstance().getName();
return this;
}
// required bool status = 3;
public boolean hasStatus() {
return result.hasStatus();
}
public boolean getStatus() {
return result.getStatus();
}
public Builder setStatus(boolean value) {
result.hasStatus = true;
result.status_ = value;
return this;
}
public Builder clearStatus() {
result.hasStatus = false;
result.status_ = false;
return this;
}
}
static {
se.scalablesolutions.akka.actor.ProtobufProtocol.getDescriptor();
}
static {
se.scalablesolutions.akka.actor.ProtobufProtocol.internalForceInit();
}
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static com.google.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\026ProtobufProtocol.proto\022\037se.scalablesol" +
"utions.akka.actor\"8\n\014ProtobufPOJO\022\n\n\002id\030" +
"\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006status\030\003 \002(\010"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public com.google.protobuf.ExtensionRegistry assignDescriptors(
com.google.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor,
new java.lang.String[] { "Id", "Name", "Status", },
se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.class,
se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.Builder.class);
return null;
}
};
com.google.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new com.google.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
}
public static void internalForceInit() {}
}

View file

@ -0,0 +1,72 @@
package se.scalablesolutions.akka.actor
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before, After}
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.serialization.Serializable.Protobuf
import ProtobufProtocol.ProtobufPOJO
/* ---------------------------
Uses this Protobuf message:
message ProtobufPOJO {
required uint64 id = 1;
required string name = 2;
required bool status = 3;
}
--------------------------- */
object ProtobufActorMessageSerializationSpec {
val unit = TimeUnit.MILLISECONDS
val HOSTNAME = "localhost"
val PORT = 9990
var server: RemoteServer = null
class RemoteActorSpecActorBidirectional extends Actor {
start
def receive = {
case pojo: ProtobufPOJO =>
val id = pojo.getId
reply(id + 1)
case msg =>
throw new RuntimeException("Expected a ProtobufPOJO message but got: " + msg)
}
}
}
class ProtobufActorMessageSerializationSpec extends JUnitSuite {
import ProtobufActorMessageSerializationSpec._
@Before
def init() {
server = new RemoteServer
server.start(HOSTNAME, PORT)
server.register("RemoteActorSpecActorBidirectional", new RemoteActorSpecActorBidirectional)
Thread.sleep(1000)
}
// make sure the servers shutdown cleanly after the test has finished
@After
def finished() {
server.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
}
@Test
def shouldSendReplyAsync = {
val actor = RemoteClient.actorFor("RemoteActorSpecActorBidirectional", 5000L, HOSTNAME, PORT)
val result = actor !! ProtobufPOJO.newBuilder
.setId(11)
.setStatus(true)
.setName("Coltrane")
.build
assert(12L === result.get.asInstanceOf[Long])
actor.stop
}
}