Deep serialization of Actors now works

This commit is contained in:
Jonas Bonér 2010-06-10 16:00:17 +02:00
parent fe9109d82f
commit 9d7877dcba
7 changed files with 716 additions and 234 deletions

View file

@ -14,7 +14,7 @@ option optimize_for = SPEED;
/**
* Defines the serialization scheme used to serialize the message and/or Actor instance.
*/
enum SerializationSchemeProtocol {
enum SerializationSchemeType {
JAVA = 1;
SBINARY = 2;
SCALA_JSON = 3;
@ -23,15 +23,15 @@ enum SerializationSchemeProtocol {
}
/**
* Defines a the life-cycle of a supervised Actor.
* Defines the type of the life-cycle of a supervised Actor.
*/
enum LifeCycleProtocol {
enum LifeCycleType {
PERMANENT = 1;
TEMPORARY = 2;
}
/*
enum DispatcherProtocol {
enum DispatcherType {
GLOBAL_EVENT_EXECUTOR_BASED = 1;
GLOBAL_REACTOR_SINGLE_THREAD_BASED = 2;
GLOBAL_REACTOR_THREAD_POOL_BASED = 3;
@ -40,11 +40,20 @@ enum DispatcherProtocol {
}
*/
/**
* Defines the life-cycle of a supervised Actor.
*/
message LifeCycleProtocol {
required LifeCycleType lifeCycle = 1;
optional string preRestart = 2;
optional string postRestart = 3;
}
/**
* Defines a remote address.
*/
message AddressProtocol {
required string hostname = 1;
required string hostname = 1;
required uint32 port = 2;
}
@ -61,7 +70,7 @@ message ExceptionProtocol {
*/
message RemoteActorRefProtocol {
required string uuid = 1;
required string actorClassName = 2;
required string actorClassname = 2;
required AddressProtocol homeAddress = 3;
optional uint64 timeout = 4;
}
@ -73,9 +82,9 @@ message RemoteActorRefProtocol {
message SerializedActorRefProtocol {
required string uuid = 1;
required string id = 2;
required string actorClassName = 3;
required string actorClassname = 3;
required bytes actorInstance = 4;
required SerializationSchemeProtocol serializationScheme = 5;
required string serializerClassname = 5;
required AddressProtocol originalAddress = 6;
optional bool isTransactor = 7;
optional uint64 timeout = 8;
@ -89,7 +98,7 @@ message SerializedActorRefProtocol {
*/
message RemoteRequestProtocol {
required uint64 id = 1;
required SerializationSchemeProtocol serializationScheme = 2;
required SerializationSchemeType serializationScheme = 2;
required bytes message = 3;
optional bytes messageManifest = 4;
optional string method = 5;
@ -108,7 +117,7 @@ message RemoteRequestProtocol {
*/
message RemoteReplyProtocol {
required uint64 id = 1;
optional SerializationSchemeProtocol serializationScheme = 2;
optional SerializationSchemeType serializationScheme = 2;
optional bytes message = 3;
optional bytes messageManifest = 4;
optional ExceptionProtocol exception = 5;

View file

@ -64,7 +64,7 @@ trait ProtobufSerializableActor[T] extends SerializableActor[T] { this: Message
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait JavaSerializableActor[T] extends SerializableActor[T] {
val serializer = Serializer.Java
@transient val serializer = Serializer.Java
def toBinary: Array[Byte] = serializer.toBinary(this)
}
@ -342,7 +342,7 @@ trait Actor extends Logging {
* Mainly for internal use, functions as the implicit sender references when invoking
* one of the message send functions ('!', '!!' and '!!!').
*/
implicit val optionSelf: Option[ActorRef] = {
@transient implicit val optionSelf: Option[ActorRef] = {
val ref = Actor.actorRefInCreation.value
Actor.actorRefInCreation.value = None
if (ref.isEmpty) throw new ActorInitializationException(
@ -351,7 +351,7 @@ trait Actor extends Logging {
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
"\n\tEither use:" +
"\n\t\t'val actor = Actor.actorOf[MyActor]', or" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))'" +
"\n\t\t'val actor = Actor.actorOf(new MyActor(..))', or" +
"\n\t\t'val actor = Actor.actor { case msg => .. } }'")
else ref
}
@ -362,7 +362,7 @@ trait Actor extends Logging {
* Mainly for internal use, functions as the implicit sender references when invoking
* the 'forward' function.
*/
implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]]
@transient implicit val someSelf: Some[ActorRef] = optionSelf.asInstanceOf[Some[ActorRef]]
/**
* The 'self' field holds the ActorRef for this actor.
@ -391,7 +391,7 @@ trait Actor extends Logging {
* self.stop(..)
* </pre>
*/
val self: ActorRef = {
@transient val self: ActorRef = {
val zelf = optionSelf.get
zelf.id = getClass.getName
zelf

View file

@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.{Map => JMap}
import java.lang.reflect.Field
import com.google.protobuf.ByteString
/**
* The ActorRef object can be used to deserialize ActorRef instances from of its binary representation
* or its Protocol Buffers (protobuf) Message representation to a Actor.actorOf instance.
@ -62,7 +64,7 @@ object ActorRef {
private[akka] def fromProtobuf(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
RemoteActorRef(
protocol.getUuid,
protocol.getActorClassName,
protocol.getActorClassname,
protocol.getHomeAddress.getHostname,
protocol.getHomeAddress.getPort,
protocol.getTimeout,
@ -211,6 +213,11 @@ trait ActorRef extends TransactionManagement {
protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s }
protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf }
/**
* Returns the uuid for the actor.
*/
def uuid = _uuid
/**
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
@ -239,15 +246,24 @@ trait ActorRef extends TransactionManagement {
def isShutdown: Boolean = _isShutDown
/**
* Returns the uuid for the actor.
*/
def uuid = _uuid
/**
* Tests if the actor is able to handle the message passed in as arguments.
* Is the actor able to handle the message passed in as arguments?
*/
def isDefinedAt(message: Any): Boolean = actor.base.isDefinedAt(message)
/**
* Is the actor is serializable?
*/
def isSerializable: Boolean = actor.isInstanceOf[SerializableActor[_]]
/**
* Returns the 'Serializer' instance for the Actor as an Option.
* <p/>
* It returns 'Some(serializer)' if the Actor is serializable and 'None' if not.
*/
def serializer: Option[Serializer] =
if (isSerializable) Some(actor.asInstanceOf[SerializableActor[_]].serializer)
else None
/**
* Only for internal use. UUID is effectively final.
*/
@ -517,7 +533,9 @@ trait ActorRef extends TransactionManagement {
*/
def shutdownLinkedActors: Unit
protected[akka] def toProtobuf: RemoteActorRefProtocol
protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol
protected[akka] def invoke(messageHandle: MessageInvocation): Unit
@ -562,7 +580,7 @@ trait ActorRef extends TransactionManagement {
protected def processSender(senderOption: Option[ActorRef], requestBuilder: RemoteRequestProtocol.Builder) = {
senderOption.foreach { sender =>
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
requestBuilder.setSender(sender.toProtobuf)
requestBuilder.setSender(sender.toRemoteActorRefProtocol)
}
}
}
@ -599,7 +617,7 @@ sealed class LocalActorRef private[akka](
/**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/
protected[akka] def toProtobuf: RemoteActorRefProtocol = guard.withGuard {
protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol = guard.withGuard {
val host = homeAddress.getHostName
val port = homeAddress.getPort
@ -609,14 +627,60 @@ sealed class LocalActorRef private[akka](
RemoteServer.registerActor(homeAddress, uuid, this)
registeredInRemoteNodeDuringSerialization = true
}
RemoteActorRefProtocol.newBuilder
.setUuid(uuid)
.setActorClassName(actorClass.getName)
.setActorClassname(actorClass.getName)
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
.setTimeout(timeout)
.build
}
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard {
if (!isSerializable) throw new IllegalStateException("Can't serialize an ActorRef using SerializedActorRefProtocol that is wrapping an Actor that is not mixing in the SerializableActor trait")
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY)
}
val builder = LifeCycleProtocol.newBuilder
lifeCycle match {
case Some(LifeCycle(scope, None)) =>
setScope(builder, scope)
Some(builder.build)
case Some(LifeCycle(scope, Some(callbacks))) =>
setScope(builder, scope)
builder.setPreRestart(callbacks.preRestart)
builder.setPostRestart(callbacks.postRestart)
Some(builder.build)
case None =>
None
}
}
val serializerClassname = serializer
.getOrElse(throw new IllegalStateException("Can't serialize Actor [" + toString + "] - no 'Serializer' defined"))
.getClass.getName
val originalAddress = AddressProtocol.newBuilder.setHostname(homeAddress.getHostName).setPort(homeAddress.getPort).build
val builder = SerializedActorRefProtocol.newBuilder
.setUuid(uuid)
.setId(id)
.setActorClassname(actorClass.getName)
.setActorInstance(ByteString.copyFrom(actor.asInstanceOf[SerializableActor[_]].toBinary))
.setSerializerClassname(serializerClassname)
.setOriginalAddress(originalAddress)
.setIsTransactor(isTransactor)
.setTimeout(timeout)
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
supervisor.foreach(sup => builder.setSupervisor(sup.toRemoteActorRefProtocol))
// FIXME: how to serialize the hotswap PartialFunction ??
// hotswap.foreach(builder.setHotswapStack(_))
builder.build
}
/**
* Returns the mailbox.
*/
@ -625,8 +689,11 @@ sealed class LocalActorRef private[akka](
/**
* Serializes the ActorRef instance into a byte array (Array[Byte]).
*/
def toBinary: Array[Byte] = toProtobuf.toByteArray
def toBinary: Array[Byte] = {
if (isSerializable) toSerializedActorRefProtocol.toByteArray
else toRemoteActorRefProtocol.toByteArray
}
/**
* Returns the class for the Actor instance that is managed by the ActorRef.
*/
@ -938,7 +1005,7 @@ sealed class LocalActorRef private[akka](
.setIsOneWay(false)
.setIsEscaped(false)
//senderOption.foreach(sender => requestBuilder.setSender(sender.toProtobuf))
//senderOption.foreach(sender => requestBuilder.setSender(sender.toRemoteActorRefProtocol))
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = registerSupervisorAsRemoteActor
@ -1277,7 +1344,8 @@ private[akka] case class RemoteActorRef private[akka] (
def mailboxSize: Int = unsupported
def supervisor: Option[ActorRef] = unsupported
def shutdownLinkedActors: Unit = unsupported
protected[akka] def toProtobuf: RemoteActorRefProtocol = unsupported
protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol = unsupported
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = unsupported
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported

View file

@ -25,21 +25,21 @@ object RemoteProtocolBuilder {
def getMessage(request: RemoteRequestProtocol): Any = {
request.getSerializationScheme match {
case SerializationSchemeProtocol.JAVA =>
case SerializationSchemeType.JAVA =>
unbox(SERIALIZER_JAVA.fromBinary(request.getMessage.toByteArray, None))
case SerializationSchemeProtocol.SBINARY =>
case SerializationSchemeType.SBINARY =>
val classToLoad = new String(request.getMessageManifest.toByteArray)
val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad)
else Class.forName(classToLoad)
val renderer = clazz.newInstance.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
renderer.fromBytes(request.getMessage.toByteArray)
case SerializationSchemeProtocol.SCALA_JSON =>
case SerializationSchemeType.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.fromBinary(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeProtocol.JAVA_JSON =>
case SerializationSchemeType.JAVA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.fromBinary(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeProtocol.PROTOBUF =>
case SerializationSchemeType.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.fromBinary(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.fromBinary(request.getMessage.toByteArray, Some(messageClass))
}
@ -47,21 +47,21 @@ object RemoteProtocolBuilder {
def getMessage(reply: RemoteReplyProtocol): Any = {
reply.getSerializationScheme match {
case SerializationSchemeProtocol.JAVA =>
case SerializationSchemeType.JAVA =>
unbox(SERIALIZER_JAVA.fromBinary(reply.getMessage.toByteArray, None))
case SerializationSchemeProtocol.SBINARY =>
case SerializationSchemeType.SBINARY =>
val classToLoad = new String(reply.getMessageManifest.toByteArray)
val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad)
else Class.forName(classToLoad)
val renderer = clazz.newInstance.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
renderer.fromBytes(reply.getMessage.toByteArray)
case SerializationSchemeProtocol.SCALA_JSON =>
case SerializationSchemeType.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.fromBinary(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeProtocol.JAVA_JSON =>
case SerializationSchemeType.JAVA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.fromBinary(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeProtocol.PROTOBUF =>
case SerializationSchemeType.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.fromBinary(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.fromBinary(reply.getMessage.toByteArray, Some(messageClass))
}
@ -70,27 +70,27 @@ object RemoteProtocolBuilder {
def setMessage(message: Any, builder: RemoteRequestProtocol.Builder) = {
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setSerializationScheme(SerializationSchemeProtocol.SBINARY)
builder.setSerializationScheme(SerializationSchemeType.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setSerializationScheme(SerializationSchemeProtocol.PROTOBUF)
builder.setSerializationScheme(SerializationSchemeType.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(serializable.getClass)))
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
builder.setSerializationScheme(SerializationSchemeProtocol.SCALA_JSON)
builder.setSerializationScheme(SerializationSchemeType.SCALA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
val serializable = message.asInstanceOf[Serializable.JavaJSON]
builder.setSerializationScheme(SerializationSchemeProtocol.JAVA_JSON)
builder.setSerializationScheme(SerializationSchemeType.JAVA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setSerializationScheme(SerializationSchemeProtocol.JAVA)
builder.setSerializationScheme(SerializationSchemeType.JAVA)
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(box(message))))
}
}
@ -98,27 +98,27 @@ object RemoteProtocolBuilder {
def setMessage(message: Any, builder: RemoteReplyProtocol.Builder) = {
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setSerializationScheme(SerializationSchemeProtocol.SBINARY)
builder.setSerializationScheme(SerializationSchemeType.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setSerializationScheme(SerializationSchemeProtocol.PROTOBUF)
builder.setSerializationScheme(SerializationSchemeType.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(serializable.getClass)))
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
builder.setSerializationScheme(SerializationSchemeProtocol.SCALA_JSON)
builder.setSerializationScheme(SerializationSchemeType.SCALA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
val serializable = message.asInstanceOf[Serializable.JavaJSON]
builder.setSerializationScheme(SerializationSchemeProtocol.JAVA_JSON)
builder.setSerializationScheme(SerializationSchemeType.JAVA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setSerializationScheme(SerializationSchemeProtocol.JAVA)
builder.setSerializationScheme(SerializationSchemeType.JAVA)
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(box(message))))
}
}

View file

@ -123,7 +123,7 @@ object Serializer {
obj
}
def fromBinary(json: String, clazz: Class[_]): AnyRef = {
def fromJSON(json: String, clazz: Class[_]): AnyRef = {
if (clazz eq null) throw new IllegalArgumentException("Can't deserialize JSON to instance if no class is provided")
mapper.readValue(json, clazz).asInstanceOf[AnyRef]
}
@ -140,7 +140,7 @@ object Serializer {
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = SJSONSerializer.SJSON.in(bytes)
import scala.reflect.Manifest
def fromBinary[T](json: String)(implicit m: Manifest[T]): AnyRef = {
def fromJSON[T](json: String)(implicit m: Manifest[T]): AnyRef = {
SJSONSerializer.SJSON.in(json)(m)
}

View file

@ -22,20 +22,20 @@ class SerializerSpec extends JUnitSuite {
@Test
def shouldSerializeString = {
val f = Foo("debasish")
val json = Serializer.ScalaJSON.out(f)
val json = Serializer.ScalaJSON.toBinary(f)
assert(new String(json) == """{"foo":"debasish"}""")
val fo = Serializer.ScalaJSON.in[Foo](new String(json)).asInstanceOf[Foo]
val fo = Serializer.ScalaJSON.fromJSON[Foo](new String(json)).asInstanceOf[Foo]
assert(fo == f)
}
@Test
def shouldSerializeTuple2 = {
val message = MyMessage("id", ("hello", 34))
val json = Serializer.ScalaJSON.out(message)
val json = Serializer.ScalaJSON.toBinary(message)
assert(new String(json) == """{"id":"id","value":{"hello":34}}""")
val f = Serializer.ScalaJSON.in[MyMessage](new String(json)).asInstanceOf[MyMessage]
val f = Serializer.ScalaJSON.fromJSON[MyMessage](new String(json)).asInstanceOf[MyMessage]
assert(f == message)
val g = Serializer.ScalaJSON.in[MyMessage](json).asInstanceOf[MyMessage]
val g = Serializer.ScalaJSON.fromBinary[MyMessage](json).asInstanceOf[MyMessage]
assert(f == message)
}
}