diff --git a/akka-actor-tests/src/main/scala/akka/testing/Serializers.scala b/akka-actor-tests/src/main/scala/akka/testing/Serializers.scala new file mode 100644 index 0000000000..0a7aa8fb7b --- /dev/null +++ b/akka-actor-tests/src/main/scala/akka/testing/Serializers.scala @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.testing + +import akka.serialization.Serializer +import com.google.protobuf.Message +import org.codehaus.jackson.map.ObjectMapper +import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream } +import akka.util.ClassLoaderObjectInputStream +import sjson.json._ + +class ProtobufSerializer extends Serializer { + val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) + + def toBinary(obj: AnyRef): Array[Byte] = { + if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException( + "Can't serialize a non-protobuf message using protobuf [" + obj + "]") + obj.asInstanceOf[Message].toByteArray + } + + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = { + if (!clazz.isDefined) throw new IllegalArgumentException( + "Need a protobuf message class to be able to serialize bytes using protobuf") + clazz.get.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message] + } +} +object ProtobufSerializer extends ProtobufSerializer + +class JavaJSONSerializer extends Serializer { + private val mapper = new ObjectMapper + + def toBinary(obj: AnyRef): Array[Byte] = { + val bos = new ByteArrayOutputStream + val out = new ObjectOutputStream(bos) + mapper.writeValue(out, obj) + out.close + bos.toByteArray + } + + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = { + if (!clazz.isDefined) throw new IllegalArgumentException( + "Can't deserialize JSON to instance if no class is provided") + val in = + if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) + else new ObjectInputStream(new ByteArrayInputStream(bytes)) + val obj = mapper.readValue(in, clazz.get).asInstanceOf[AnyRef] + in.close + obj + } +} +object JavaJSONSerializer extends JavaJSONSerializer + +class SJSONSerializer extends Serializer { + + def toBinary(obj: AnyRef): Array[Byte] = + sjson.json.Serializer.SJSON.out(obj) + + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], cl: Option[ClassLoader] = None): AnyRef = { + if (!clazz.isDefined) throw new IllegalArgumentException( + "Can't deserialize JSON to instance if no class is provided") + + import sjson.json.Serializer._ + val sj = new SJSON with DefaultConstructor { val classLoader = cl } + sj.in(bytes, clazz.get.getName) + } +} +object SJSONSerializer extends SJSONSerializer diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala new file mode 100644 index 0000000000..48e43a5e6d --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.serialization + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import akka.serialization.Serialization._ +import scala.reflect._ + +object SerializeSpec { + @BeanInfo + case class Address(no: String, street: String, city: String, zip: String) { def this() = this("", "", "", "") } + @BeanInfo + case class Person(name: String, age: Int, address: Address) { def this() = this("", 0, null) } + + case class Record(id: Int, person: Person) +} + +class SerializeSpec extends JUnitSuite { + import SerializeSpec._ + + @Test + def shouldSerializeAddress { + val addr = Address("120", "Monroe Street", "Santa Clara", "95050") + val b = serialize(addr) match { + case Left(exception) ⇒ fail(exception) + case Right(bytes) ⇒ bytes + } + deserialize(b.asInstanceOf[Array[Byte]], classOf[Address], None) match { + case Left(exception) ⇒ fail(exception) + case Right(add) ⇒ assert(add === addr) + } + } + + @Test + def shouldSerializePerson { + val person = Person("debasish ghosh", 25, Address("120", "Monroe Street", "Santa Clara", "95050")) + val b = serialize(person) match { + case Left(exception) ⇒ fail(exception) + case Right(bytes) ⇒ bytes + } + deserialize(b.asInstanceOf[Array[Byte]], classOf[Person], None) match { + case Left(exception) ⇒ fail(exception) + case Right(p) ⇒ assert(p === person) + } + } + + @Test + def shouldSerializeRecordWithDefaultSerializer { + val person = Person("debasish ghosh", 25, Address("120", "Monroe Street", "Santa Clara", "95050")) + val r = Record(100, person) + val b = serialize(r) match { + case Left(exception) ⇒ fail(exception) + case Right(bytes) ⇒ bytes + } + deserialize(b.asInstanceOf[Array[Byte]], classOf[Record], None) match { + case Left(exception) ⇒ fail(exception) + case Right(p) ⇒ assert(p === r) + } + } +} diff --git a/akka-actor-tests/src/test/scala/akka/serialization/akka-serializer.conf b/akka-actor-tests/src/test/scala/akka/serialization/akka-serializer.conf new file mode 100644 index 0000000000..324adb5c96 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/serialization/akka-serializer.conf @@ -0,0 +1,16 @@ +akka { + actor { + serializers { + java = "akka.serialization.JavaSerializer" + proto = "akka.testing.ProtobufSerializer" + sjson = "akka.testing.SJSONSerializer" + default = "akka.serialization.JavaSerializer" + } + + bindings { + java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"] + sjson = ["akka.serialization.SerializeSpec$Person"] + proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"] + } + } +} diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index a8aab54a90..adbfa0b115 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -397,23 +397,28 @@ object Actor extends ListenerManagement { "] for serialization of actor [" + address + "] since " + reason) - val serializer: Serializer = serializerClassName match { - case null | "" | Format.`defaultSerializerName` ⇒ Format.Default - case specialSerializer ⇒ - ReflectiveAccess.getClassFor(specialSerializer) match { - case Right(clazz) ⇒ - clazz.newInstance match { - case s: Serializer ⇒ s - case other ⇒ serializerErrorDueTo("class must be of type [akka.serialization.Serializer]") - } - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - serializerErrorDueTo(cause.toString) - } - } + val serializer: Serializer = + akka.serialization.Serialization.getSerializer(this.getClass).fold(x ⇒ serializerErrorDueTo(x.toString), s ⇒ s) + + /** + * val serializer: Serializer = serializerClassName match { + * case null | "" | Format.`defaultSerializerName` ⇒ Format.Default + * case specialSerializer ⇒ + * ReflectiveAccess.getClassFor(specialSerializer) match { + * case Right(clazz) ⇒ + * clazz.newInstance match { + * case s: Serializer ⇒ s + * case other ⇒ serializerErrorDueTo("class must be of type [akka.serialization.Serializer]") + * } + * case Left(exception) ⇒ + * val cause = exception match { + * case i: InvocationTargetException ⇒ i.getTargetException + * case _ ⇒ exception + * } + * serializerErrorDueTo(cause.toString) + * } + * } + */ val isStateful = state match { case _: Stateless | Stateless ⇒ false diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 9da6df072f..8550a54f90 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -13,7 +13,7 @@ import akka.actor.DeploymentConfig._ import akka.config.{ ConfigurationException, Config } import akka.routing.RouterType import akka.util.ReflectiveAccess._ -import akka.serialization.Format +import akka.serialization._ import akka.AkkaException /** @@ -31,7 +31,7 @@ object DeploymentConfig { case class Deploy( address: String, routing: Routing = Direct, - format: String = Format.defaultSerializerName, + format: String = Serializer.defaultSerializerName, // Format.defaultSerializerName, scope: Scope = Local) // -------------------------------- @@ -263,7 +263,7 @@ object Deployer { // -------------------------------- val addressPath = "akka.actor.deployment." + address Config.config.getSection(addressPath) match { - case None ⇒ Some(Deploy(address, Direct, Format.defaultSerializerName, Local)) + case None ⇒ Some(Deploy(address, Direct, Serializer.defaultSerializerName, Local)) case Some(addressConfig) ⇒ // -------------------------------- @@ -290,14 +290,14 @@ object Deployer { // -------------------------------- // akka.actor.deployment.
.format // -------------------------------- - val format = addressConfig.getString("format", Format.defaultSerializerName) + val format = addressConfig.getString("format", Serializer.defaultSerializerName) // -------------------------------- // akka.actor.deployment.
.clustered // -------------------------------- addressConfig.getSection("clustered") match { case None ⇒ - Some(Deploy(address, router, Format.defaultSerializerName, Local)) // deploy locally + Some(Deploy(address, router, Serializer.defaultSerializerName, Local)) // deploy locally case Some(clusteredConfig) ⇒ diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala index 16daea4c88..5e123ca345 100644 --- a/akka-actor/src/main/scala/akka/config/Config.scala +++ b/akka-actor/src/main/scala/akka/config/Config.scala @@ -119,4 +119,10 @@ object Config { val startTime = System.currentTimeMillis def uptime = (System.currentTimeMillis - startTime) / 1000 + + val serializers = config.getSection("akka.actor.serializers").map(_.map).getOrElse(Map("default" -> "akka.serialization.JavaSerializer")) + val bindings = config.getSection("akka.actor.bindings") + .map(_.map) + .map(m ⇒ Map() ++ m.map { case (k, v: List[String]) ⇒ Map() ++ v.map((_, k)) }.flatten) + val serializerMap = bindings.map(m ⇒ m.map { case (k, v: String) ⇒ (k, serializers(v)) }).getOrElse(Map()) } diff --git a/akka-actor/src/main/scala/akka/serialization/Format.scala b/akka-actor/src/main/scala/akka/serialization/Format.scala index 40b0d5dbc2..0e221c9b01 100644 --- a/akka-actor/src/main/scala/akka/serialization/Format.scala +++ b/akka-actor/src/main/scala/akka/serialization/Format.scala @@ -10,44 +10,44 @@ import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, B /** * @author Jonas Bonér + * trait Serializer extends scala.Serializable { + * @volatile + * var classLoader: Option[ClassLoader] = None + * def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass)) + * + * def toBinary(obj: AnyRef): Array[Byte] + * def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef + * } */ -trait Serializer extends scala.Serializable { - @volatile - var classLoader: Option[ClassLoader] = None - def deepClone(obj: AnyRef): AnyRef = fromBinary(toBinary(obj), Some(obj.getClass)) - - def toBinary(obj: AnyRef): Array[Byte] - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef -} /** * + * object Format { + * implicit object Default extends Serializer { + * import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream } + * //import org.apache.commons.io.input.ClassLoaderObjectInputStream + * + * def toBinary(obj: AnyRef): Array[Byte] = { + * val bos = new ByteArrayOutputStream + * val out = new ObjectOutputStream(bos) + * out.writeObject(obj) + * out.close() + * bos.toByteArray + * } + * + * def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]], classLoader: Option[ClassLoader] = None): AnyRef = { + * val in = + * //if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else + * new ObjectInputStream(new ByteArrayInputStream(bytes)) + * val obj = in.readObject + * in.close() + * obj + * } + * } + * + * val defaultSerializerName = Default.getClass.getName + * } */ -object Format { - implicit object Default extends Serializer { - import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream } - //import org.apache.commons.io.input.ClassLoaderObjectInputStream - - def toBinary(obj: AnyRef): Array[Byte] = { - val bos = new ByteArrayOutputStream - val out = new ObjectOutputStream(bos) - out.writeObject(obj) - out.close() - bos.toByteArray - } - - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - val in = - //if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else - new ObjectInputStream(new ByteArrayInputStream(bytes)) - val obj = in.readObject - in.close() - obj - } - } - - val defaultSerializerName = Default.getClass.getName -} trait FromBinary[T <: Actor] { def fromBinary(bytes: Array[Byte], act: T): T diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala new file mode 100644 index 0000000000..206a4296df --- /dev/null +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -0,0 +1,76 @@ +package akka.serialization + +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +import akka.util.ReflectiveAccess._ +import akka.config.Config +import akka.config.Config._ +import akka.actor.{ ActorRef, Actor } + +object Serialization { + case class NoSerializerFoundException(m: String) extends Exception(m) + + def serialize(o: AnyRef): Either[Exception, Array[Byte]] = + getSerializer(o.getClass) + .fold((ex) ⇒ Left(ex), + (ser) ⇒ Right(ser.toBinary(o))) + + def deserialize(bytes: Array[Byte], clazz: Class[_], + classLoader: Option[ClassLoader]): Either[Exception, AnyRef] = + getSerializer(clazz) + .fold((ex) ⇒ Left(ex), + (ser) ⇒ Right(ser.fromBinary(bytes, Some(clazz), classLoader))) + + def getSerializer(clazz: Class[_]): Either[Exception, Serializer] = { + Config.serializerMap.get(clazz.getName) match { + case Some(serializerName: String) ⇒ + getClassFor(serializerName) match { + case Right(serializer) ⇒ { + Right(serializer.newInstance.asInstanceOf[Serializer]) + } + case Left(exception) ⇒ Left(exception) + } + case _ ⇒ + getDefaultSerializer match { + case Some(s: Serializer) ⇒ Right(s) + case None ⇒ Left(new Exception("No default serializer found for " + clazz)) + } + } + } + + private def getDefaultSerializer = { + Config.serializers.get("default") match { + case Some(ser: String) ⇒ + getClassFor(ser) match { + case Right(srializer) ⇒ { + Some(srializer.newInstance.asInstanceOf[Serializer]) + } + case Left(exception) ⇒ None + } + case None ⇒ None + } + } + + private def getSerializerInstanceForBestMatchClass(configMap: collection.mutable.Map[String, String], cl: Class[_]) = { + configMap + .find { + case (clazzName, ser) ⇒ + getClassFor(clazzName) match { + case Right(clazz) ⇒ + clazz.isAssignableFrom(cl) + case _ ⇒ false + } + } + .map { + case (_, ser) ⇒ + getClassFor(ser) match { + case Right(s) ⇒ + val instance = s.newInstance.asInstanceOf[Serializer] + Right(instance) + case _ ⇒ Left(new Exception("Error instantiating " + ser)) + } + }.getOrElse(Left(NoSerializerFoundException("No mapping serializer found for " + cl))) + } +} diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala new file mode 100644 index 0000000000..7ac3eea2df --- /dev/null +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -0,0 +1,39 @@ +package akka.serialization + +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream } +import akka.util.ClassLoaderObjectInputStream +import akka.actor.ActorRef + +trait Serializer extends scala.Serializable { + def toBinary(o: AnyRef): Array[Byte] + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef +} + +class JavaSerializer extends Serializer { + def toBinary(o: AnyRef): Array[Byte] = { + val bos = new ByteArrayOutputStream + val out = new ObjectOutputStream(bos) + out.writeObject(o) + out.close() + bos.toByteArray + } + + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, + classLoader: Option[ClassLoader] = None): AnyRef = { + val in = + if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else + new ObjectInputStream(new ByteArrayInputStream(bytes)) + val obj = in.readObject + in.close() + obj + } +} + +object JavaSerializer extends JavaSerializer +object Serializer { + val defaultSerializerName = JavaSerializer.getClass.getName +} diff --git a/akka-actor/src/main/scala/akka/util/ClassLoaderObjectInputStream.scala b/akka-actor/src/main/scala/akka/util/ClassLoaderObjectInputStream.scala new file mode 100644 index 0000000000..1749c118d0 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/ClassLoaderObjectInputStream.scala @@ -0,0 +1,16 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.util + +import java.io.{ InputStream, ObjectInputStream, ObjectStreamClass } + +class ClassLoaderObjectInputStream(classLoader: ClassLoader, is: InputStream) extends ObjectInputStream(is) { + override protected def resolveClass(objectStreamClass: ObjectStreamClass): Class[_] = { + Class.forName(objectStreamClass.getName, false, classLoader) match { + case null ⇒ super.resolveClass(objectStreamClass) + case clazz ⇒ clazz + } + } +} diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 37bff63270..3fbc35492d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -36,7 +36,7 @@ import akka.routing.RouterType import akka.config.{ Config, Supervision } import Supervision._ import Config._ -import akka.serialization.{ Format, Serializers, Serializer, Compression } +import akka.serialization.{ Format, Serializer, Compression } import Compression.LZF import akka.AkkaException @@ -524,7 +524,7 @@ class DefaultClusterNode private[akka] ( * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * available durable store. */ - def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode = if (isConnected.isOn) { + def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean): ClusterNode = if (isConnected.isOn) { import akka.serialization.ActorSerialization._ @@ -535,8 +535,8 @@ class DefaultClusterNode private[akka] ( EventHandler.debug(this, "Storing actor [%s] with UUID [%s] in cluster".format(actorRef.address, uuid)) - val actorBytes = if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox)(format)) - else toBinary(actorRef)(format) + val actorBytes = if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox)) + else toBinary(actorRef) val actorRegistryPath = actorRegistryPathFor(uuid) // create UUID -> Array[Byte] for actor registry @@ -668,7 +668,7 @@ class DefaultClusterNode private[akka] ( * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String, format: Serializer): Option[LocalActorRef] = if (isConnected.isOn) { + def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = if (isConnected.isOn) { import akka.serialization.ActorSerialization._ @@ -699,7 +699,7 @@ class DefaultClusterNode private[akka] ( locallyCheckedOutActors += (uuid -> bytes) // FIXME switch to ReplicatedActorRef here // val actor = new ReplicatedActorRef(fromBinary[T](bytes, remoteServerAddress)(format)) - val actor = fromBinary[T](bytes, remoteServerAddress)(format) + val actor = fromBinary[T](bytes, remoteServerAddress) // remoteService.register(UUID_PREFIX + uuid, actor) // FIXME is Actor.remote.register(UUID, ..) correct here? actor.start() actor.asInstanceOf[LocalActorRef] diff --git a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java index 5ebf1f58dd..4fd11c3cbe 100644 --- a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java @@ -3280,32 +3280,23 @@ public final class RemoteProtocol { return akka.remote.protocol.RemoteProtocol.internal_static_MessageProtocol_fieldAccessorTable; } - // required .SerializationSchemeType serializationScheme = 1; - public static final int SERIALIZATIONSCHEME_FIELD_NUMBER = 1; - private boolean hasSerializationScheme; - private akka.remote.protocol.RemoteProtocol.SerializationSchemeType serializationScheme_; - public boolean hasSerializationScheme() { return hasSerializationScheme; } - public akka.remote.protocol.RemoteProtocol.SerializationSchemeType getSerializationScheme() { return serializationScheme_; } - - // required bytes message = 2; - public static final int MESSAGE_FIELD_NUMBER = 2; + // required bytes message = 1; + public static final int MESSAGE_FIELD_NUMBER = 1; private boolean hasMessage; private com.google.protobuf.ByteString message_ = com.google.protobuf.ByteString.EMPTY; public boolean hasMessage() { return hasMessage; } public com.google.protobuf.ByteString getMessage() { return message_; } - // optional bytes messageManifest = 3; - public static final int MESSAGEMANIFEST_FIELD_NUMBER = 3; + // optional bytes messageManifest = 2; + public static final int MESSAGEMANIFEST_FIELD_NUMBER = 2; private boolean hasMessageManifest; private com.google.protobuf.ByteString messageManifest_ = com.google.protobuf.ByteString.EMPTY; public boolean hasMessageManifest() { return hasMessageManifest; } public com.google.protobuf.ByteString getMessageManifest() { return messageManifest_; } private void initFields() { - serializationScheme_ = akka.remote.protocol.RemoteProtocol.SerializationSchemeType.JAVA; } public final boolean isInitialized() { - if (!hasSerializationScheme) return false; if (!hasMessage) return false; return true; } @@ -3313,14 +3304,11 @@ public final class RemoteProtocol { public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); - if (hasSerializationScheme()) { - output.writeEnum(1, getSerializationScheme().getNumber()); - } if (hasMessage()) { - output.writeBytes(2, getMessage()); + output.writeBytes(1, getMessage()); } if (hasMessageManifest()) { - output.writeBytes(3, getMessageManifest()); + output.writeBytes(2, getMessageManifest()); } getUnknownFields().writeTo(output); } @@ -3331,17 +3319,13 @@ public final class RemoteProtocol { if (size != -1) return size; size = 0; - if (hasSerializationScheme()) { - size += com.google.protobuf.CodedOutputStream - .computeEnumSize(1, getSerializationScheme().getNumber()); - } if (hasMessage()) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, getMessage()); + .computeBytesSize(1, getMessage()); } if (hasMessageManifest()) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(3, getMessageManifest()); + .computeBytesSize(2, getMessageManifest()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -3501,9 +3485,6 @@ public final class RemoteProtocol { public Builder mergeFrom(akka.remote.protocol.RemoteProtocol.MessageProtocol other) { if (other == akka.remote.protocol.RemoteProtocol.MessageProtocol.getDefaultInstance()) return this; - if (other.hasSerializationScheme()) { - setSerializationScheme(other.getSerializationScheme()); - } if (other.hasMessage()) { setMessage(other.getMessage()); } @@ -3535,21 +3516,11 @@ public final class RemoteProtocol { } break; } - case 8: { - int rawValue = input.readEnum(); - akka.remote.protocol.RemoteProtocol.SerializationSchemeType value = akka.remote.protocol.RemoteProtocol.SerializationSchemeType.valueOf(rawValue); - if (value == null) { - unknownFields.mergeVarintField(1, rawValue); - } else { - setSerializationScheme(value); - } - break; - } - case 18: { + case 10: { setMessage(input.readBytes()); break; } - case 26: { + case 18: { setMessageManifest(input.readBytes()); break; } @@ -3558,28 +3529,7 @@ public final class RemoteProtocol { } - // required .SerializationSchemeType serializationScheme = 1; - public boolean hasSerializationScheme() { - return result.hasSerializationScheme(); - } - public akka.remote.protocol.RemoteProtocol.SerializationSchemeType getSerializationScheme() { - return result.getSerializationScheme(); - } - public Builder setSerializationScheme(akka.remote.protocol.RemoteProtocol.SerializationSchemeType value) { - if (value == null) { - throw new NullPointerException(); - } - result.hasSerializationScheme = true; - result.serializationScheme_ = value; - return this; - } - public Builder clearSerializationScheme() { - result.hasSerializationScheme = false; - result.serializationScheme_ = akka.remote.protocol.RemoteProtocol.SerializationSchemeType.JAVA; - return this; - } - - // required bytes message = 2; + // required bytes message = 1; public boolean hasMessage() { return result.hasMessage(); } @@ -3600,7 +3550,7 @@ public final class RemoteProtocol { return this; } - // optional bytes messageManifest = 3; + // optional bytes messageManifest = 2; public boolean hasMessageManifest() { return result.hasMessageManifest(); } @@ -5736,23 +5686,22 @@ public final class RemoteProtocol { "RemoteMessageProtocol\"g\n\037SerializedTyped" + "ActorRefProtocol\022-\n\010actorRef\030\001 \002(\0132\033.Ser" + "ializedActorRefProtocol\022\025\n\rinterfaceName" + - "\030\002 \002(\t\"r\n\017MessageProtocol\0225\n\023serializati" + - "onScheme\030\001 \002(\0162\030.SerializationSchemeType" + - "\022\017\n\007message\030\002 \002(\014\022\027\n\017messageManifest\030\003 \001" + - "(\014\"R\n\021ActorInfoProtocol\022\033\n\004uuid\030\001 \002(\0132\r.", - "UuidProtocol\022\017\n\007timeout\030\002 \002(\004\022\017\n\007address" + - "\030\003 \001(\t\")\n\014UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003" + - "low\030\002 \002(\004\"3\n\025MetadataEntryProtocol\022\013\n\003ke" + - "y\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"6\n\021LifeCycleProto" + - "col\022!\n\tlifeCycle\030\001 \002(\0162\016.LifeCycleType\"1" + - "\n\017AddressProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004p" + - "ort\030\002 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tclassn" + - "ame\030\001 \002(\t\022\017\n\007message\030\002 \002(\t*(\n\013CommandTyp" + - "e\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002*]\n\027Serializ" + - "ationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016", - "\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBU" + - "F\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001\022\r\n\tT" + - "EMPORARY\020\002B\030\n\024akka.remote.protocolH\001" + "\030\002 \002(\t\";\n\017MessageProtocol\022\017\n\007message\030\001 \002" + + "(\014\022\027\n\017messageManifest\030\002 \001(\014\"R\n\021ActorInfo" + + "Protocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022\017\n" + + "\007timeout\030\002 \002(\004\022\017\n\007address\030\003 \001(\t\")\n\014UuidP", + "rotocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Me" + + "tadataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005valu" + + "e\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!\n\tlifeCycl" + + "e\030\001 \002(\0162\016.LifeCycleType\"1\n\017AddressProtoc" + + "ol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021Ex" + + "ceptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007me" + + "ssage\030\002 \002(\t*(\n\013CommandType\022\013\n\007CONNECT\020\001\022" + + "\014\n\010SHUTDOWN\020\002*]\n\027SerializationSchemeType" + + "\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016\n\nSCALA_JSON\020\003\022" + + "\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBUF\020\005*-\n\rLifeCycl", + "eType\022\r\n\tPERMANENT\020\001\022\r\n\tTEMPORARY\020\002B\030\n\024a" + + "kka.remote.protocolH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5812,7 +5761,7 @@ public final class RemoteProtocol { internal_static_MessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MessageProtocol_descriptor, - new java.lang.String[] { "SerializationScheme", "Message", "MessageManifest", }, + new java.lang.String[] { "Message", "MessageManifest", }, akka.remote.protocol.RemoteProtocol.MessageProtocol.class, akka.remote.protocol.RemoteProtocol.MessageProtocol.Builder.class); internal_static_ActorInfoProtocol_descriptor = diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 795a58814c..2a25ff6e0e 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -89,9 +89,8 @@ message SerializedTypedActorRefProtocol { * Defines a message. */ message MessageProtocol { - required SerializationSchemeType serializationScheme = 1; - required bytes message = 2; - optional bytes messageManifest = 3; + required bytes message = 1; + optional bytes messageManifest = 2; } /** diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 98bf8400cf..181acb7f03 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -4,98 +4,30 @@ package akka.remote -import akka.serialization.{ Serializers, Serializable } import akka.remote.protocol.RemoteProtocol._ import akka.util._ +import akka.serialization.Serialization import com.google.protobuf.{ Message, ByteString } object MessageSerializer { - private def SERIALIZER_JAVA: Serializers.Java = Serializers.Java - private def SERIALIZER_JAVA_JSON: Serializers.JavaJSON = Serializers.JavaJSON - private def SERIALIZER_SCALA_JSON: Serializers.ScalaJSON = Serializers.ScalaJSON - private def SERIALIZER_PROTOBUF: Serializers.Protobuf = Serializers.Protobuf - def setClassLoader(cl: ClassLoader) = { - val someCl = Some(cl) - SERIALIZER_JAVA.classLoader = someCl - SERIALIZER_JAVA_JSON.classLoader = someCl - SERIALIZER_SCALA_JSON.classLoader = someCl + def deserialize(messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = { + val clazz = loadManifest(classLoader, messageProtocol) + Serialization.deserialize(messageProtocol.getMessage.toByteArray, + clazz, classLoader).fold(x ⇒ throw x, o ⇒ o) } - def deserialize(messageProtocol: MessageProtocol): Any = { - messageProtocol.getSerializationScheme match { - case SerializationSchemeType.JAVA ⇒ - unbox(SERIALIZER_JAVA.fromBinary(messageProtocol.getMessage.toByteArray, None)) - - case SerializationSchemeType.PROTOBUF ⇒ - val clazz = loadManifest(SERIALIZER_PROTOBUF.classLoader, messageProtocol) - SERIALIZER_PROTOBUF.fromBinary(messageProtocol.getMessage.toByteArray, Some(clazz)) - - case SerializationSchemeType.SCALA_JSON ⇒ - val clazz = loadManifest(SERIALIZER_SCALA_JSON.classLoader, messageProtocol) - val renderer = clazz.newInstance.asInstanceOf[Serializable.ScalaJSON[_]] - renderer.fromBytes(messageProtocol.getMessage.toByteArray) - - case SerializationSchemeType.JAVA_JSON ⇒ - val clazz = loadManifest(SERIALIZER_JAVA_JSON.classLoader, messageProtocol) - SERIALIZER_JAVA_JSON.fromBinary(messageProtocol.getMessage.toByteArray, Some(clazz)) - } - } - - def serialize(message: Any): MessageProtocol = { + def serialize(message: AnyRef): MessageProtocol = { val builder = MessageProtocol.newBuilder - if (message.isInstanceOf[Message]) { - val serializable = message.asInstanceOf[Message] - builder.setSerializationScheme(SerializationSchemeType.PROTOBUF) - builder.setMessage(ByteString.copyFrom(serializable.toByteArray)) - builder.setMessageManifest(ByteString.copyFromUtf8(serializable.getClass.getName)) - } else if (message.isInstanceOf[Serializable.ScalaJSON[_]]) { - builder.setSerializationScheme(SerializationSchemeType.SCALA_JSON) - setMessageAndManifest(builder, message.asInstanceOf[Serializable.ScalaJSON[_ <: Any]]) - } else if (message.isInstanceOf[Serializable.JavaJSON]) { - builder.setSerializationScheme(SerializationSchemeType.JAVA_JSON) - setMessageAndManifest(builder, message.asInstanceOf[Serializable.JavaJSON]) - } else { - // default, e.g. if no protocol used explicitly then use Java serialization - builder.setSerializationScheme(SerializationSchemeType.JAVA) - builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(box(message)))) - } + val bytes = Serialization.serialize(message).fold(x ⇒ throw x, b ⇒ b) + builder.setMessage(ByteString.copyFrom(bytes)) + builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) builder.build } private def loadManifest(classLoader: Option[ClassLoader], messageProtocol: MessageProtocol): Class[_] = { val manifest = messageProtocol.getMessageManifest.toStringUtf8 - if (classLoader.isDefined) classLoader.get.loadClass(manifest) - else Class.forName(manifest) - } - - private def setMessageAndManifest(builder: MessageProtocol.Builder, serializable: Serializable) = { - builder.setMessage(ByteString.copyFrom(serializable.toBytes)) - builder.setMessageManifest(ByteString.copyFromUtf8(serializable.getClass.getName)) - } - - private def box(value: Any): AnyRef = value match { - case value: Boolean ⇒ new java.lang.Boolean(value) - case value: Char ⇒ new java.lang.Character(value) - case value: Short ⇒ new java.lang.Short(value) - case value: Int ⇒ new java.lang.Integer(value) - case value: Long ⇒ new java.lang.Long(value) - case value: Float ⇒ new java.lang.Float(value) - case value: Double ⇒ new java.lang.Double(value) - case value: Byte ⇒ new java.lang.Byte(value) - case value ⇒ value.asInstanceOf[AnyRef] - } - - private def unbox(value: AnyRef): Any = value match { - case value: java.lang.Boolean ⇒ value.booleanValue - case value: java.lang.Character ⇒ value.charValue - case value: java.lang.Short ⇒ value.shortValue - case value: java.lang.Integer ⇒ value.intValue - case value: java.lang.Long ⇒ value.longValue - case value: java.lang.Float ⇒ value.floatValue - case value: java.lang.Double ⇒ value.doubleValue - case value: java.lang.Byte ⇒ value.byteValue - case value ⇒ value + classLoader map (_.loadClass(manifest)) getOrElse (Class.forName(manifest)) } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 5ffc4b16f3..dae668ca95 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -84,7 +84,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem private[akka] def withClientFor[T]( address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient ⇒ T): T = { - loader.foreach(MessageSerializer.setClassLoader(_)) + // loader.foreach(MessageSerializer.setClassLoader(_)) val key = Address(address) lock.readLock.lock try { @@ -804,7 +804,7 @@ class RemoteServerHandler( val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler { import RemoteServerSettings._ - applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY + // applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]() diff --git a/akka-remote/src/main/scala/akka/serialization/Serializable.scala b/akka-remote/src/main/scala/akka/serialization/Serializable.scala deleted file mode 100644 index 3f3296d608..0000000000 --- a/akka-remote/src/main/scala/akka/serialization/Serializable.scala +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.serialization - -import org.codehaus.jackson.map.ObjectMapper - -import com.google.protobuf.Message - -import reflect.Manifest - -import java.io.{ StringWriter, ByteArrayOutputStream, ObjectOutputStream } - -import sjson.json.{ Serializer ⇒ SJSONSerializer } - -/** - * @author Jonas Bonér - */ -trait Serializable { - def toBytes: Array[Byte] -} - -/** - * Serialization protocols. - * - * @author Jonas Bonér - */ -object Serializable { - - /** - * @author Jonas Bonér - */ - trait JSON extends Serializable { - def toJSON: String - } - - /** - * @author Jonas Bonér - */ - abstract class JavaJSON extends JSON { - - def toJSON: String = { - val out = new StringWriter - val mapper = new ObjectMapper - mapper.writeValue(out, this) - out.close - out.toString - } - - def toBytes: Array[Byte] = { - val bos = new ByteArrayOutputStream - val out = new ObjectOutputStream(bos) - val mapper = new ObjectMapper - mapper.writeValue(out, this) - out.close - bos.toByteArray - } - } - - /** - * case class Address(street: String, city: String, zip: String) - * extends ScalaJSON[Address] { - * - * implicit val AddressFormat: Format[Address] = - * asProduct3("street", "city", "zip")(Address)(Address.unapply(_).get) - * - * import dispatch.json._ - * import sjson.json._ - * import sjson.json.JsonSerialization._ - * - * def toJSON: String = JsValue.toJson(tojson(this)) - * def toBytes: Array[Byte] = tobinary(this) - * def fromBytes(bytes: Array[Byte]): Address = frombinary[Address](bytes) - * def fromJSON(js: String): Address = fromjson[Address](Js(js)) - * } - * - * val a = Address(...) - * val js = tojson(a) - * val add = fromjson[Address](js) - * - * @author Jonas Bonér - */ - trait ScalaJSON[T] extends JSON { - def toJSON: String - def fromJSON(js: String): T - def toBytes: Array[Byte] - def fromBytes(bytes: Array[Byte]): T - } -} diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 8a34a0ec5e..5c0b563c15 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -23,27 +23,27 @@ import akka.remote.{ RemoteClientSettings, MessageSerializer } * Module for local actor serialization. */ object ActorSerialization { - implicit val defaultSerializer = Format.Default + implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default - def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress)(implicit format: Serializer): ActorRef = - fromBinaryToLocalActorRef(bytes, Some(homeAddress), format) + def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef = + fromBinaryToLocalActorRef(bytes, Some(homeAddress)) - def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Serializer): ActorRef = - fromBinaryToLocalActorRef(bytes, None, format) + def fromBinary[T <: Actor](bytes: Array[Byte]): ActorRef = + fromBinaryToLocalActorRef(bytes, None) - def toBinary[T <: Actor](a: ActorRef, serializeMailBox: Boolean = true)(implicit format: Serializer): Array[Byte] = - toSerializedActorRefProtocol(a, format, serializeMailBox).toByteArray + def toBinary[T <: Actor](a: ActorRef, serializeMailBox: Boolean = true): Array[Byte] = + toSerializedActorRefProtocol(a, serializeMailBox).toByteArray // wrapper for implicits to be used by Java - def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Serializer): ActorRef = - fromBinary(bytes)(format) + def fromBinaryJ[T <: Actor](bytes: Array[Byte]): ActorRef = + fromBinary(bytes) // wrapper for implicits to be used by Java - def toBinaryJ[T <: Actor](a: ActorRef, format: Serializer, srlMailBox: Boolean = true): Array[Byte] = - toBinary(a, srlMailBox)(format) + def toBinaryJ[T <: Actor](a: ActorRef, srlMailBox: Boolean = true): Array[Byte] = + toBinary(a, srlMailBox) private[akka] def toSerializedActorRefProtocol[T <: Actor]( - actorRef: ActorRef, format: Serializer, serializeMailBox: Boolean = true): SerializedActorRefProtocol = { + actorRef: ActorRef, serializeMailBox: Boolean = true): SerializedActorRefProtocol = { val lifeCycleProtocol: Option[LifeCycleProtocol] = { actorRef.lifeCycle match { case Permanent ⇒ Some(LifeCycleProtocol.newBuilder.setLifeCycle(LifeCycleType.PERMANENT).build) @@ -84,23 +84,28 @@ object ActorSerialization { } actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_)) - builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T]))) + // builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T]))) + Serialization.serialize(actorRef.actor.asInstanceOf[T]) match { + case Right(bytes) ⇒ builder.setActorInstance(ByteString.copyFrom(bytes)) + case Left(exception) ⇒ throw new Exception("Error serializing : " + actorRef.actor.getClass.getName) + } + lifeCycleProtocol.foreach(builder.setLifeCycle(_)) actorRef.supervisor.foreach(s ⇒ builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s))) - if (!actorRef.hotswap.isEmpty) builder.setHotswapStack(ByteString.copyFrom(Serializers.Java.toBinary(actorRef.hotswap))) + // if (!actorRef.hotswap.isEmpty) builder.setHotswapStack(ByteString.copyFrom(Serializers.Java.toBinary(actorRef.hotswap))) + if (!actorRef.hotswap.isEmpty) builder.setHotswapStack(ByteString.copyFrom(akka.serialization.JavaSerializer.toBinary(actorRef.hotswap))) builder.build } private def fromBinaryToLocalActorRef[T <: Actor]( bytes: Array[Byte], - homeAddress: Option[InetSocketAddress], - format: Serializer): ActorRef = { + homeAddress: Option[InetSocketAddress]): ActorRef = { val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes) - fromProtobufToLocalActorRef(builder.build, format, None) + fromProtobufToLocalActorRef(builder.build, None) } private[akka] def fromProtobufToLocalActorRef[T <: Actor]( - protocol: SerializedActorRefProtocol, format: Serializer, loader: Option[ClassLoader]): ActorRef = { + protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { val lifeCycle = if (protocol.hasLifeCycle) { @@ -117,9 +122,13 @@ object ActorSerialization { val hotswap = try { - format - .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[Stack[PartialFunction[Any, Unit]]])) - .asInstanceOf[Stack[PartialFunction[Any, Unit]]] + Serialization.deserialize(protocol.getHotswapStack.toByteArray, classOf[Stack[PartialFunction[Any, Unit]]], loader) match { + case Right(r) ⇒ r.asInstanceOf[Stack[PartialFunction[Any, Unit]]] + case Left(ex) ⇒ throw new Exception("Cannot de-serialize hotswapstack") + } + // format + // .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[Stack[PartialFunction[Any, Unit]]])) + // .asInstanceOf[Stack[PartialFunction[Any, Unit]]] } catch { case e: Exception ⇒ Stack[PartialFunction[Any, Unit]]() } @@ -129,7 +138,11 @@ object ActorSerialization { val factory = () ⇒ { val actorClass = classLoader.loadClass(protocol.getActorClassname) try { - format.fromBinary(protocol.getActorInstance.toByteArray, Some(actorClass)).asInstanceOf[Actor] + Serialization.deserialize(protocol.getActorInstance.toByteArray, actorClass, loader) match { + case Right(r) ⇒ r.asInstanceOf[Actor] + case Left(ex) ⇒ throw new Exception("Cannot de-serialize : " + actorClass) + } + // format.fromBinary(protocol.getActorInstance.toByteArray, Some(actorClass)).asInstanceOf[Actor] } catch { case e: Exception ⇒ actorClass.newInstance.asInstanceOf[Actor] } @@ -146,11 +159,7 @@ object ActorSerialization { factory) val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]] - messages.foreach(message ⇒ ar ! MessageSerializer.deserialize(message.getMessage)) - - //if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false) - // format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T]) - //ar + messages.foreach(message ⇒ ar ! MessageSerializer.deserialize(message.getMessage, Some(classLoader))) ar } } @@ -174,7 +183,7 @@ object RemoteActorSerialization { */ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { RemoteActorRef( - Serializers.Java.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress])).asInstanceOf[InetSocketAddress], + JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress], protocol.getAddress, protocol.getTimeout, loader) @@ -194,7 +203,7 @@ object RemoteActorSerialization { ReflectiveAccess.RemoteModule.configDefaultAddress } RemoteActorRefProtocol.newBuilder - .setInetSocketAddress(ByteString.copyFrom(Serializers.Java.toBinary(remoteAddress))) + .setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress))) .setAddress(actor.address) .setTimeout(actor.timeout) .build @@ -230,7 +239,7 @@ object RemoteActorSerialization { message match { case Right(message) ⇒ - messageBuilder.setMessage(MessageSerializer.serialize(message)) + messageBuilder.setMessage(MessageSerializer.serialize(message.asInstanceOf[AnyRef])) case Left(exception) ⇒ messageBuilder.setException(ExceptionProtocol.newBuilder .setClassname(exception.getClass.getName) diff --git a/akka-remote/src/main/scala/akka/serialization/Serializer.scala b/akka-remote/src/main/scala/akka/serialization/Serializer.scala index 9c387503d3..7ac3eea2df 100644 --- a/akka-remote/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-remote/src/main/scala/akka/serialization/Serializer.scala @@ -1,149 +1,39 @@ +package akka.serialization + /** * Copyright (C) 2009-2011 Scalable Solutions AB */ -package akka.serialization - import java.io.{ ObjectOutputStream, ByteArrayOutputStream, ObjectInputStream, ByteArrayInputStream } +import akka.util.ClassLoaderObjectInputStream +import akka.actor.ActorRef -import org.apache.commons.io.input.ClassLoaderObjectInputStream - -import com.google.protobuf.Message - -import org.codehaus.jackson.map.ObjectMapper - -import sjson.json.{ Serializer ⇒ SJSONSerializer } - -// For Java API -class SerializerFactory { - import Serializers._ - def getJava: Java.type = Java - def getJavaJSON: JavaJSON.type = JavaJSON - def getScalaJSON: ScalaJSON.type = ScalaJSON - def getProtobuf: Protobuf.type = Protobuf +trait Serializer extends scala.Serializable { + def toBinary(o: AnyRef): Array[Byte] + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, classLoader: Option[ClassLoader] = None): AnyRef } -/** - * @author Jonas Bonér - */ -object Serializers { - val ARRAY_OF_BYTE_ARRAY = Array[Class[_]](classOf[Array[Byte]]) - - object NOOP extends NOOP - class NOOP extends Serializer { - def toBinary(obj: AnyRef): Array[Byte] = Array[Byte]() - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = null.asInstanceOf[AnyRef] +class JavaSerializer extends Serializer { + def toBinary(o: AnyRef): Array[Byte] = { + val bos = new ByteArrayOutputStream + val out = new ObjectOutputStream(bos) + out.writeObject(o) + out.close() + bos.toByteArray } - /** - * @author Jonas Bonér - */ - object Java extends Java - trait Java extends Serializer { - def toBinary(obj: AnyRef): Array[Byte] = { - val bos = new ByteArrayOutputStream - val out = new ObjectOutputStream(bos) - out.writeObject(obj) - out.close - bos.toByteArray - } - - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - val in = - if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) - else new ObjectInputStream(new ByteArrayInputStream(bytes)) - val obj = in.readObject - in.close - obj - } + def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]] = None, + classLoader: Option[ClassLoader] = None): AnyRef = { + val in = + if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else + new ObjectInputStream(new ByteArrayInputStream(bytes)) + val obj = in.readObject + in.close() + obj } - - /** - * @author Jonas Bonér - */ - object Protobuf extends Protobuf - trait Protobuf extends Serializer { - def toBinary(obj: AnyRef): Array[Byte] = { - if (!obj.isInstanceOf[Message]) throw new IllegalArgumentException( - "Can't serialize a non-protobuf message using protobuf [" + obj + "]") - obj.asInstanceOf[Message].toByteArray - } - - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - if (!clazz.isDefined) throw new IllegalArgumentException( - "Need a protobuf message class to be able to serialize bytes using protobuf") - clazz.get.getDeclaredMethod("parseFrom", ARRAY_OF_BYTE_ARRAY: _*).invoke(null, bytes).asInstanceOf[Message] - } - - def fromBinary(bytes: Array[Byte], clazz: Class[_]): AnyRef = { - if (clazz eq null) throw new IllegalArgumentException("Protobuf message can't be null") - fromBinary(bytes, Some(clazz)) - } - } - - /** - * @author Jonas Bonér - */ - object JavaJSON extends JavaJSON - trait JavaJSON extends Serializer { - private val mapper = new ObjectMapper - - def toBinary(obj: AnyRef): Array[Byte] = { - val bos = new ByteArrayOutputStream - val out = new ObjectOutputStream(bos) - mapper.writeValue(out, obj) - out.close - bos.toByteArray - } - - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = { - if (!clazz.isDefined) throw new IllegalArgumentException( - "Can't deserialize JSON to instance if no class is provided") - val in = - if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) - else new ObjectInputStream(new ByteArrayInputStream(bytes)) - val obj = mapper.readValue(in, clazz.get).asInstanceOf[AnyRef] - in.close - obj - } - - def fromJSON(json: String, clazz: Class[_]): AnyRef = { - if (clazz eq null) throw new IllegalArgumentException("Can't deserialize JSON to instance if no class is provided") - mapper.readValue(json, clazz).asInstanceOf[AnyRef] - } - } - - /** - * @author Jonas Bonér - */ - trait ScalaJSON { - import sjson.json._ - - var classLoader: Option[ClassLoader] = None - - def tojson[T](o: T)(implicit tjs: Writes[T]): JsValue = JsonSerialization.tojson(o)(tjs) - - def fromjson[T](json: JsValue)(implicit fjs: Reads[T]): T = JsonSerialization.fromjson(json)(fjs) - - def tobinary[T](o: T)(implicit tjs: Writes[T]): Array[Byte] = JsonSerialization.tobinary(o)(tjs) - - def frombinary[T](bytes: Array[Byte])(implicit fjs: Reads[T]): T = JsonSerialization.frombinary(bytes)(fjs) - - // backward compatibility - // implemented using refelction based json serialization - def toBinary(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj) - - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = SJSONSerializer.SJSON.in(bytes) - - import scala.reflect.Manifest - def fromJSON[T](json: String)(implicit m: Manifest[T]): AnyRef = { - SJSONSerializer.SJSON.in(json)(m) - } - - def fromBinary[T](bytes: Array[Byte])(implicit m: Manifest[T]): AnyRef = { - SJSONSerializer.SJSON.in(bytes)(m) - } - } - object ScalaJSON extends ScalaJSON } +object JavaSerializer extends JavaSerializer +object Serializer { + val defaultSerializerName = JavaSerializer.getClass.getName +} diff --git a/akka-remote/src/test/java/akka/actor/ProtobufProtocol.java b/akka-remote/src/test/java/akka/actor/ProtobufProtocol.java new file mode 100644 index 0000000000..3b900d2e6c --- /dev/null +++ b/akka-remote/src/test/java/akka/actor/ProtobufProtocol.java @@ -0,0 +1,422 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: ProtobufProtocol.proto + +package akka.actor; + +public final class ProtobufProtocol { + private ProtobufProtocol() {} + public static void registerAllExtensions( + com.google.protobuf.ExtensionRegistry registry) { + } + public static final class MyMessage extends + com.google.protobuf.GeneratedMessage { + // Use MyMessage.newBuilder() to construct. + private MyMessage() { + initFields(); + } + private MyMessage(boolean noInit) {} + + private static final MyMessage defaultInstance; + public static MyMessage getDefaultInstance() { + return defaultInstance; + } + + public MyMessage getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.actor.ProtobufProtocol.internal_static_akka_actor_MyMessage_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.actor.ProtobufProtocol.internal_static_akka_actor_MyMessage_fieldAccessorTable; + } + + // required uint64 id = 1; + public static final int ID_FIELD_NUMBER = 1; + private boolean hasId; + private long id_ = 0L; + public boolean hasId() { return hasId; } + public long getId() { return id_; } + + // required string name = 2; + public static final int NAME_FIELD_NUMBER = 2; + private boolean hasName; + private java.lang.String name_ = ""; + public boolean hasName() { return hasName; } + public java.lang.String getName() { return name_; } + + // required bool status = 3; + public static final int STATUS_FIELD_NUMBER = 3; + private boolean hasStatus; + private boolean status_ = false; + public boolean hasStatus() { return hasStatus; } + public boolean getStatus() { return status_; } + + private void initFields() { + } + public final boolean isInitialized() { + if (!hasId) return false; + if (!hasName) return false; + if (!hasStatus) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (hasId()) { + output.writeUInt64(1, getId()); + } + if (hasName()) { + output.writeString(2, getName()); + } + if (hasStatus()) { + output.writeBool(3, getStatus()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasId()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(1, getId()); + } + if (hasName()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getName()); + } + if (hasStatus()) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(3, getStatus()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static akka.actor.ProtobufProtocol.MyMessage parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static akka.actor.ProtobufProtocol.MyMessage parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static akka.actor.ProtobufProtocol.MyMessage parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static akka.actor.ProtobufProtocol.MyMessage parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static akka.actor.ProtobufProtocol.MyMessage parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static akka.actor.ProtobufProtocol.MyMessage parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static akka.actor.ProtobufProtocol.MyMessage parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static akka.actor.ProtobufProtocol.MyMessage parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static akka.actor.ProtobufProtocol.MyMessage parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static akka.actor.ProtobufProtocol.MyMessage parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.actor.ProtobufProtocol.MyMessage prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private akka.actor.ProtobufProtocol.MyMessage result; + + // Construct using akka.actor.ProtobufProtocol.MyMessage.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new akka.actor.ProtobufProtocol.MyMessage(); + return builder; + } + + protected akka.actor.ProtobufProtocol.MyMessage internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new akka.actor.ProtobufProtocol.MyMessage(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.actor.ProtobufProtocol.MyMessage.getDescriptor(); + } + + public akka.actor.ProtobufProtocol.MyMessage getDefaultInstanceForType() { + return akka.actor.ProtobufProtocol.MyMessage.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public akka.actor.ProtobufProtocol.MyMessage build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private akka.actor.ProtobufProtocol.MyMessage buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public akka.actor.ProtobufProtocol.MyMessage buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + akka.actor.ProtobufProtocol.MyMessage returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.actor.ProtobufProtocol.MyMessage) { + return mergeFrom((akka.actor.ProtobufProtocol.MyMessage)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.actor.ProtobufProtocol.MyMessage other) { + if (other == akka.actor.ProtobufProtocol.MyMessage.getDefaultInstance()) return this; + if (other.hasId()) { + setId(other.getId()); + } + if (other.hasName()) { + setName(other.getName()); + } + if (other.hasStatus()) { + setStatus(other.getStatus()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 8: { + setId(input.readUInt64()); + break; + } + case 18: { + setName(input.readString()); + break; + } + case 24: { + setStatus(input.readBool()); + break; + } + } + } + } + + + // required uint64 id = 1; + public boolean hasId() { + return result.hasId(); + } + public long getId() { + return result.getId(); + } + public Builder setId(long value) { + result.hasId = true; + result.id_ = value; + return this; + } + public Builder clearId() { + result.hasId = false; + result.id_ = 0L; + return this; + } + + // required string name = 2; + public boolean hasName() { + return result.hasName(); + } + public java.lang.String getName() { + return result.getName(); + } + public Builder setName(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasName = true; + result.name_ = value; + return this; + } + public Builder clearName() { + result.hasName = false; + result.name_ = getDefaultInstance().getName(); + return this; + } + + // required bool status = 3; + public boolean hasStatus() { + return result.hasStatus(); + } + public boolean getStatus() { + return result.getStatus(); + } + public Builder setStatus(boolean value) { + result.hasStatus = true; + result.status_ = value; + return this; + } + public Builder clearStatus() { + result.hasStatus = false; + result.status_ = false; + return this; + } + + // @@protoc_insertion_point(builder_scope:akka.actor.MyMessage) + } + + static { + defaultInstance = new MyMessage(true); + akka.actor.ProtobufProtocol.internalForceInit(); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:akka.actor.MyMessage) + } + + private static com.google.protobuf.Descriptors.Descriptor + internal_static_akka_actor_MyMessage_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_akka_actor_MyMessage_fieldAccessorTable; + + public static com.google.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static com.google.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\026ProtobufProtocol.proto\022\nakka.actor\"5\n\t" + + "MyMessage\022\n\n\002id\030\001 \002(\004\022\014\n\004name\030\002 \002(\t\022\016\n\006s" + + "tatus\030\003 \002(\010" + }; + com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public com.google.protobuf.ExtensionRegistry assignDescriptors( + com.google.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_akka_actor_MyMessage_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_akka_actor_MyMessage_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_akka_actor_MyMessage_descriptor, + new java.lang.String[] { "Id", "Name", "Status", }, + akka.actor.ProtobufProtocol.MyMessage.class, + akka.actor.ProtobufProtocol.MyMessage.Builder.class); + return null; + } + }; + com.google.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new com.google.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + public static void internalForceInit() {} + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/akka-remote/src/test/protocol/ProtobufProtocol.proto b/akka-remote/src/test/protocol/ProtobufProtocol.proto new file mode 100644 index 0000000000..5e41f75978 --- /dev/null +++ b/akka-remote/src/test/protocol/ProtobufProtocol.proto @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.actor; + +/* + Compile with: + cd ./akka-remote/src/test/protocol + protoc ProtobufProtocol.proto --java_out ../java +*/ + +message MyMessage { + required uint64 id = 1; + required string name = 2; + required bool status = 3; +} + diff --git a/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala new file mode 100644 index 0000000000..54f1bb565c --- /dev/null +++ b/akka-remote/src/test/scala/akka/serialization/ActorSerializeSpec.scala @@ -0,0 +1,169 @@ +package akka.serialization + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import com.google.protobuf.Message + +import akka.serialization.ActorSerialization._ +import akka.actor._ +import Actor._ +import SerializeSpec._ + +case class MyMessage(id: Long, name: String, status: Boolean) +@RunWith(classOf[JUnitRunner]) +class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll { + + describe("Serializable actor") { + it("should be able to serialize and de-serialize a stateful actor with a given serializer") { + + val actor1 = actorOf[MyJavaSerializableActor].start() + (actor1 !! "hello").getOrElse("_") should equal("world 1") + (actor1 !! "hello").getOrElse("_") should equal("world 2") + + val bytes = toBinary(actor1) + val actor2 = fromBinary(bytes) + actor2.start() + (actor2 !! "hello").getOrElse("_") should equal("world 3") + + actor2.receiveTimeout should equal(Some(1000)) + actor1.stop() + actor2.stop() + } + + it("should be able to serialize and deserialize a MyStatelessActorWithMessagesInMailbox") { + + val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start() + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + actor1.mailboxSize should be > (0) + val actor2 = fromBinary(toBinary(actor1)) + Thread.sleep(1000) + actor2.mailboxSize should be > (0) + (actor2 !! "hello-reply").getOrElse("_") should equal("world") + + val actor3 = fromBinary(toBinary(actor1, false)) + Thread.sleep(1000) + actor3.mailboxSize should equal(0) + (actor3 !! "hello-reply").getOrElse("_") should equal("world") + } + + it("should be able to serialize and deserialize a PersonActorWithMessagesInMailbox") { + + val p1 = Person("debasish ghosh", 25, SerializeSpec.Address("120", "Monroe Street", "Santa Clara", "95050")) + val actor1 = actorOf[PersonActorWithMessagesInMailbox].start() + (actor1 ! p1) + (actor1 ! p1) + (actor1 ! p1) + (actor1 ! p1) + (actor1 ! p1) + (actor1 ! p1) + (actor1 ! p1) + (actor1 ! p1) + (actor1 ! p1) + (actor1 ! p1) + actor1.mailboxSize should be > (0) + val actor2 = fromBinary(toBinary(actor1)) + Thread.sleep(1000) + actor2.mailboxSize should be > (0) + (actor2 !! "hello-reply").getOrElse("_") should equal("hello") + + val actor3 = fromBinary(toBinary(actor1, false)) + Thread.sleep(1000) + actor3.mailboxSize should equal(0) + (actor3 !! "hello-reply").getOrElse("_") should equal("hello") + } + } + + describe("serialize protobuf") { + it("should serialize") { + val msg = MyMessage(123, "debasish ghosh", true) + import akka.serialization.Serialization._ + val b = serialize(ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build) match { + case Left(exception) ⇒ fail(exception) + case Right(bytes) ⇒ bytes + } + val in = deserialize(b, classOf[ProtobufProtocol.MyMessage], None) match { + case Left(exception) ⇒ fail(exception) + case Right(i) ⇒ i + } + val m = in.asInstanceOf[ProtobufProtocol.MyMessage] + MyMessage(m.getId, m.getName, m.getStatus) should equal(msg) + } + } + + describe("serialize actor that accepts protobuf message") { + it("should serialize") { + + val actor1 = actorOf[MyActorWithProtobufMessagesInMailbox].start() + val msg = MyMessage(123, "debasish ghosh", true) + val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build + (actor1 ! b) + (actor1 ! b) + (actor1 ! b) + (actor1 ! b) + (actor1 ! b) + (actor1 ! b) + (actor1 ! b) + (actor1 ! b) + (actor1 ! b) + (actor1 ! b) + actor1.mailboxSize should be > (0) + val actor2 = fromBinary(toBinary(actor1)) + Thread.sleep(1000) + actor2.mailboxSize should be > (0) + (actor2 !! "hello-reply").getOrElse("_") should equal("world") + + val actor3 = fromBinary(toBinary(actor1, false)) + Thread.sleep(1000) + actor3.mailboxSize should equal(0) + (actor3 !! "hello-reply").getOrElse("_") should equal("world") + } + } +} + +class MyJavaSerializableActor extends Actor with scala.Serializable { + var count = 0 + self.receiveTimeout = Some(1000) + + def receive = { + case "hello" ⇒ + count = count + 1 + self.reply("world " + count) + } +} + +class MyStatelessActorWithMessagesInMailbox extends Actor with scala.Serializable { + def receive = { + case "hello" ⇒ + Thread.sleep(500) + case "hello-reply" ⇒ self.reply("world") + } +} + +class MyActorWithProtobufMessagesInMailbox extends Actor with scala.Serializable { + def receive = { + case m: Message ⇒ + Thread.sleep(500) + case "hello-reply" ⇒ self.reply("world") + } +} + +class PersonActorWithMessagesInMailbox extends Actor with scala.Serializable { + def receive = { + case p: Person ⇒ + Thread.sleep(500) + case "hello-reply" ⇒ self.reply("hello") + } +} diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 06a24035eb..9e69f3faf3 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -682,6 +682,9 @@ class AkkaParentProject(info: ProjectInfo) extends ParentProject(info) with Exec val junit = Dependencies.junit val scalatest = Dependencies.scalatest val multiverse_test = Dependencies.multiverse_test // StandardLatch + val protobuf = Dependencies.protobuf + val jackson = Dependencies.jackson + val sjson = Dependencies.sjson override def compileOptions = super.compileOptions ++ compileOptions("-P:continuations:enable") }