diff --git a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala index 65558dd997..cae7f151b4 100644 --- a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala +++ b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala @@ -17,7 +17,6 @@ object RemoteProtocolBuilder { private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf - def setClassLoader(cl: ClassLoader) = { SERIALIZER_JAVA.classLoader = Some(cl) SERIALIZER_JAVA_JSON.classLoader = Some(cl) @@ -26,6 +25,8 @@ object RemoteProtocolBuilder { def getMessage(request: RemoteRequest): Any = { request.getProtocol match { + case SerializationProtocol.JAVA => + unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None)) case SerializationProtocol.SBINARY => val renderer = Class.forName(new String(request.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] renderer.fromBytes(request.getMessage.toByteArray) @@ -36,17 +37,19 @@ object RemoteProtocolBuilder { val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String] SERIALIZER_JAVA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest))) case SerializationProtocol.PROTOBUF => + val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] + val protobufMessage = messageClass.newInstance.asInstanceOf[Serializable.Protobuf[_]] + protobufMessage.fromBytes(request.getMessage.toByteArray) + case SerializationProtocol.PROTOBUF_RAW => val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] SERIALIZER_PROTOBUF.in(request.getMessage.toByteArray, Some(messageClass)) - case SerializationProtocol.JAVA => - unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None)) - case SerializationProtocol.AVRO => - throw new UnsupportedOperationException("Avro protocol is not yet supported") } } def getMessage(reply: RemoteReply): Any = { reply.getProtocol match { + case SerializationProtocol.JAVA => + unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None)) case SerializationProtocol.SBINARY => val renderer = Class.forName(new String(reply.getMessageManifest.toByteArray)).newInstance.asInstanceOf[SBinary[_ <: AnyRef]] renderer.fromBytes(reply.getMessage.toByteArray) @@ -57,12 +60,12 @@ object RemoteProtocolBuilder { val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String] SERIALIZER_JAVA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest))) case SerializationProtocol.PROTOBUF => + val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] + val protobufMessage = messageClass.newInstance.asInstanceOf[Serializable.Protobuf[_]] + protobufMessage.fromBytes(reply.getMessage.toByteArray) + case SerializationProtocol.PROTOBUF_RAW => val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]] SERIALIZER_PROTOBUF.in(reply.getMessage.toByteArray, Some(messageClass)) - case SerializationProtocol.JAVA => - unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None)) - case SerializationProtocol.AVRO => - throw new UnsupportedOperationException("Avro protocol is not yet supported") } } @@ -72,9 +75,14 @@ object RemoteProtocolBuilder { builder.setProtocol(SerializationProtocol.SBINARY) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) + } else if (message.isInstanceOf[Serializable.Protobuf[_]]) { + val serializable = message.asInstanceOf[Serializable.Protobuf[_]] + builder.setProtocol(SerializationProtocol.PROTOBUF) + builder.setMessage(ByteString.copyFrom(serializable.getMessage.toByteArray)) + builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Message]) { val serializable = message.asInstanceOf[Message] - builder.setProtocol(SerializationProtocol.PROTOBUF) + builder.setProtocol(SerializationProtocol.PROTOBUF_RAW) builder.setMessage(ByteString.copyFrom(serializable.toByteArray)) builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Serializable.ScalaJSON]) { @@ -100,9 +108,14 @@ object RemoteProtocolBuilder { builder.setProtocol(SerializationProtocol.SBINARY) builder.setMessage(ByteString.copyFrom(serializable.toBytes)) builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes)) + } else if (message.isInstanceOf[Serializable.Protobuf[_]]) { + val serializable = message.asInstanceOf[Serializable.Protobuf[_]] + builder.setProtocol(SerializationProtocol.PROTOBUF) + builder.setMessage(ByteString.copyFrom(serializable.getMessage.toByteArray)) + builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Message]) { val serializable = message.asInstanceOf[Message] - builder.setProtocol(SerializationProtocol.PROTOBUF) + builder.setProtocol(SerializationProtocol.PROTOBUF_RAW) builder.setMessage(ByteString.copyFrom(serializable.toByteArray)) builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass))) } else if (message.isInstanceOf[Serializable.ScalaJSON]) { diff --git a/akka-core/src/main/scala/serialization/Serializable.scala b/akka-core/src/main/scala/serialization/Serializable.scala index d0a199f67b..0f9bcc4f75 100644 --- a/akka-core/src/main/scala/serialization/Serializable.scala +++ b/akka-core/src/main/scala/serialization/Serializable.scala @@ -16,12 +16,12 @@ import java.io.{StringWriter, ByteArrayOutputStream, ObjectOutputStream} import sjson.json.{Serializer=>SJSONSerializer} object SerializationProtocol { + val JAVA = 0 val SBINARY = 1 val SCALA_JSON = 2 val JAVA_JSON = 3 val PROTOBUF = 4 - val JAVA = 5 - val AVRO = 6 + val PROTOBUF_RAW = 5 } /** diff --git a/akka-core/src/test/java/ProtobufProtocol.proto b/akka-core/src/test/java/ProtobufProtocol.proto new file mode 100644 index 0000000000..f4b146506c --- /dev/null +++ b/akka-core/src/test/java/ProtobufProtocol.proto @@ -0,0 +1,17 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor; + +/* + Compile with: + cd ./akka-core/src/test/java + protoc ProtobufProtocol.proto --java_out . +*/ + +message ProtobufPOJO { + required uint64 id = 1; + required string name = 2; + required bool status = 3; +} diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java new file mode 100644 index 0000000000..9995225cf5 --- /dev/null +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/ProtobufProtocol.java @@ -0,0 +1,402 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! + +package se.scalablesolutions.akka.actor; + +public final class ProtobufProtocol { + private ProtobufProtocol() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public static final class ProtobufPOJO extends + com.google.protobuf.GeneratedMessage { + // Use ProtobufPOJO.newBuilder() to construct. + private ProtobufPOJO() {} + + private static final ProtobufPOJO defaultInstance = new ProtobufPOJO(); + public static ProtobufPOJO getDefaultInstance() { + return defaultInstance; + } + + public ProtobufPOJO getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable; + } + + // required uint64 id = 1; + public static final int ID_FIELD_NUMBER = 1; + private boolean hasId; + private long id_ = 0L; + public boolean hasId() { return hasId; } + public long getId() { return id_; } + + // required string name = 2; + public static final int NAME_FIELD_NUMBER = 2; + private boolean hasName; + private java.lang.String name_ = ""; + public boolean hasName() { return hasName; } + public java.lang.String getName() { return name_; } + + // required bool status = 3; + public static final int STATUS_FIELD_NUMBER = 3; + private boolean hasStatus; + private boolean status_ = false; + public boolean hasStatus() { return hasStatus; } + public boolean getStatus() { return status_; } + + public final boolean isInitialized() { + if (!hasId) return false; + if (!hasName) return false; + if (!hasStatus) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (hasId()) { + output.writeUInt64(1, getId()); + } + if (hasName()) { + output.writeString(2, getName()); + } + if (hasStatus()) { + output.writeBool(3, getStatus()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasId()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(1, getId()); + } + if (hasName()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getName()); + } + if (hasStatus()) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(3, getStatus()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO result; + + // Construct using se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO(); + return builder; + } + + protected se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDescriptor(); + } + + public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO getDefaultInstanceForType() { + return se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO) { + return mergeFrom((se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO other) { + if (other == se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.getDefaultInstance()) return this; + if (other.hasId()) { + setId(other.getId()); + } + if (other.hasName()) { + setName(other.getName()); + } + if (other.hasStatus()) { + setStatus(other.getStatus()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 8: { + setId(input.readUInt64()); + break; + } + case 18: { + setName(input.readString()); + break; + } + case 24: { + setStatus(input.readBool()); + break; + } + } + } + } + + + // required uint64 id = 1; + public boolean hasId() { + return result.hasId(); + } + public long getId() { + return result.getId(); + } + public Builder setId(long value) { + result.hasId = true; + result.id_ = value; + return this; + } + public Builder clearId() { + result.hasId = false; + result.id_ = 0L; + return this; + } + + // required string name = 2; + public boolean hasName() { + return result.hasName(); + } + public java.lang.String getName() { + return result.getName(); + } + public Builder setName(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasName = true; + result.name_ = value; + return this; + } + public Builder clearName() { + result.hasName = false; + result.name_ = getDefaultInstance().getName(); + return this; + } + + // required bool status = 3; + public boolean hasStatus() { + return result.hasStatus(); + } + public boolean getStatus() { + return result.getStatus(); + } + public Builder setStatus(boolean value) { + result.hasStatus = true; + result.status_ = value; + return this; + } + public Builder clearStatus() { + result.hasStatus = false; + result.status_ = false; + return this; + } + } + + static { + se.scalablesolutions.akka.actor.ProtobufProtocol.getDescriptor(); + } + + static { + se.scalablesolutions.akka.actor.ProtobufProtocol.internalForceInit(); + } + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\026ProtobufProtocol.proto\022\037se.scalablesol" + + "utions.akka.actor\"8\n\014ProtobufPOJO\022\n\n\002id\030" + + "\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006status\030\003 \002(\010" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_se_scalablesolutions_akka_actor_ProtobufPOJO_descriptor, + new java.lang.String[] { "Id", "Name", "Status", }, + se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.class, + se.scalablesolutions.akka.actor.ProtobufProtocol.ProtobufPOJO.Builder.class); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + public static void internalForceInit() {} +} diff --git a/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala new file mode 100644 index 0000000000..cde44b3016 --- /dev/null +++ b/akka-core/src/test/scala/ProtobufActorMessageSerializationSpec.scala @@ -0,0 +1,72 @@ +package se.scalablesolutions.akka.actor + +import java.util.concurrent.{CountDownLatch, TimeUnit} +import org.scalatest.junit.JUnitSuite +import org.junit.{Test, Before, After} + +import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient} +import se.scalablesolutions.akka.dispatch.Dispatchers +import se.scalablesolutions.akka.serialization.Serializable.Protobuf + +import ProtobufProtocol.ProtobufPOJO + +/* --------------------------- +Uses this Protobuf message: + +message ProtobufPOJO { + required uint64 id = 1; + required string name = 2; + required bool status = 3; +} +--------------------------- */ + +object ProtobufActorMessageSerializationSpec { + val unit = TimeUnit.MILLISECONDS + val HOSTNAME = "localhost" + val PORT = 9990 + var server: RemoteServer = null + + class RemoteActorSpecActorBidirectional extends Actor { + start + def receive = { + case pojo: ProtobufPOJO => + val id = pojo.getId + reply(id + 1) + case msg => + throw new RuntimeException("Expected a ProtobufPOJO message but got: " + msg) + } + } +} + +class ProtobufActorMessageSerializationSpec extends JUnitSuite { + import ProtobufActorMessageSerializationSpec._ + + @Before + def init() { + server = new RemoteServer + server.start(HOSTNAME, PORT) + server.register("RemoteActorSpecActorBidirectional", new RemoteActorSpecActorBidirectional) + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + @After + def finished() { + server.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } + + @Test + def shouldSendReplyAsync = { + val actor = RemoteClient.actorFor("RemoteActorSpecActorBidirectional", 5000L, HOSTNAME, PORT) + val result = actor !! ProtobufPOJO.newBuilder + .setId(11) + .setStatus(true) + .setName("Coltrane") + .build + assert(12L === result.get.asInstanceOf[Long]) + actor.stop + } +} + \ No newline at end of file