closing #364, serializiation for typed actor proxy ref
This commit is contained in:
parent
e6974cc3b7
commit
b78658ad29
5 changed files with 1149 additions and 111 deletions
|
|
@ -4,7 +4,6 @@
|
|||
|
||||
package se.scalablesolutions.akka.serialization
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, ActorType}
|
||||
import se.scalablesolutions.akka.stm.global._
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement._
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement
|
||||
|
|
@ -15,6 +14,7 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F
|
|||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
|
||||
import com.google.protobuf.ByteString
|
||||
import se.scalablesolutions.akka.actor._
|
||||
|
||||
/**
|
||||
* Type class definition for Actor Serialization
|
||||
|
|
@ -36,13 +36,14 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
|
|||
* Create a Format object with the client actor as the implementation of the type class
|
||||
*
|
||||
* <pre>
|
||||
* object BinaryFormatMyStatelessActor {
|
||||
* object BinaryFormatMyStatelessActor {
|
||||
* implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
trait StatelessActorFormat[T <: Actor] extends Format[T] {
|
||||
def fromBinary(bytes: Array[Byte], act: T) = act
|
||||
|
||||
def toBinary(ac: T) = Array.empty[Byte]
|
||||
}
|
||||
|
||||
|
|
@ -53,16 +54,18 @@ trait StatelessActorFormat[T <: Actor] extends Format[T] {
|
|||
* a serializer object
|
||||
*
|
||||
* <pre>
|
||||
* object BinaryFormatMyJavaSerializableActor {
|
||||
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
|
||||
* object BinaryFormatMyJavaSerializableActor {
|
||||
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
|
||||
* val serializer = Serializer.Java
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*/
|
||||
trait SerializerBasedActorFormat[T <: Actor] extends Format[T] {
|
||||
val serializer: Serializer
|
||||
|
||||
def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T]
|
||||
|
||||
def toBinary(ac: T) = serializer.toBinary(ac)
|
||||
}
|
||||
|
||||
|
|
@ -70,7 +73,6 @@ trait SerializerBasedActorFormat[T <: Actor] extends Format[T] {
|
|||
* Module for local actor serialization.
|
||||
*/
|
||||
object ActorSerialization {
|
||||
|
||||
def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef =
|
||||
fromBinaryToLocalActorRef(bytes, format)
|
||||
|
||||
|
|
@ -85,8 +87,8 @@ object ActorSerialization {
|
|||
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T]): Array[Byte] =
|
||||
toBinary(a)(format)
|
||||
|
||||
private def toSerializedActorRefProtocol[T <: Actor](
|
||||
actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
|
||||
private[akka] def toSerializedActorRefProtocol[T <: Actor](
|
||||
actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
|
||||
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
|
||||
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
|
||||
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
|
||||
|
|
@ -102,17 +104,17 @@ object ActorSerialization {
|
|||
}
|
||||
|
||||
val originalAddress = AddressProtocol.newBuilder
|
||||
.setHostname(actorRef.homeAddress.getHostName)
|
||||
.setPort(actorRef.homeAddress.getPort)
|
||||
.build
|
||||
.setHostname(actorRef.homeAddress.getHostName)
|
||||
.setPort(actorRef.homeAddress.getPort)
|
||||
.build
|
||||
|
||||
val builder = SerializedActorRefProtocol.newBuilder
|
||||
.setUuid(actorRef.uuid)
|
||||
.setId(actorRef.id)
|
||||
.setActorClassname(actorRef.actorClass.getName)
|
||||
.setOriginalAddress(originalAddress)
|
||||
.setIsTransactor(actorRef.isTransactor)
|
||||
.setTimeout(actorRef.timeout)
|
||||
.setUuid(actorRef.uuid)
|
||||
.setId(actorRef.id)
|
||||
.setActorClassname(actorRef.actorClass.getName)
|
||||
.setOriginalAddress(originalAddress)
|
||||
.setIsTransactor(actorRef.isTransactor)
|
||||
.setTimeout(actorRef.timeout)
|
||||
|
||||
actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_))
|
||||
builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T])))
|
||||
|
|
@ -126,33 +128,33 @@ object ActorSerialization {
|
|||
private def fromBinaryToLocalActorRef[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef =
|
||||
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None)
|
||||
|
||||
private def fromProtobufToLocalActorRef[T <: Actor](
|
||||
protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = {
|
||||
private[akka] def fromProtobufToLocalActorRef[T <: Actor](
|
||||
protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = {
|
||||
Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol)
|
||||
|
||||
val serializer =
|
||||
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
|
||||
Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer)
|
||||
else None
|
||||
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
|
||||
Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer)
|
||||
else None
|
||||
|
||||
val lifeCycle =
|
||||
if (protocol.hasLifeCycle) {
|
||||
val lifeCycleProtocol = protocol.getLifeCycle
|
||||
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent)
|
||||
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary)
|
||||
else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
|
||||
} else None
|
||||
if (protocol.hasLifeCycle) {
|
||||
val lifeCycleProtocol = protocol.getLifeCycle
|
||||
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent)
|
||||
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary)
|
||||
else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
|
||||
} else None
|
||||
|
||||
val supervisor =
|
||||
if (protocol.hasSupervisor)
|
||||
Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
|
||||
else None
|
||||
if (protocol.hasSupervisor)
|
||||
Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
|
||||
else None
|
||||
|
||||
val hotswap =
|
||||
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
|
||||
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
|
||||
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
|
||||
.asInstanceOf[PartialFunction[Any, Unit]])
|
||||
else None
|
||||
else None
|
||||
|
||||
val classLoader = loader.getOrElse(getClass.getClassLoader)
|
||||
|
||||
|
|
@ -194,9 +196,9 @@ object RemoteActorSerialization {
|
|||
def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
|
||||
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
|
||||
|
||||
/**
|
||||
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
|
||||
*/
|
||||
/**
|
||||
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
|
||||
*/
|
||||
def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
|
||||
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
|
||||
|
||||
|
|
@ -230,21 +232,21 @@ object RemoteActorSerialization {
|
|||
}
|
||||
|
||||
RemoteActorRefProtocol.newBuilder
|
||||
.setUuid(uuid)
|
||||
.setActorClassname(actorClass.getName)
|
||||
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
|
||||
.setTimeout(timeout)
|
||||
.build
|
||||
.setUuid(uuid)
|
||||
.setActorClassname(actorClass.getName)
|
||||
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
|
||||
.setTimeout(timeout)
|
||||
.build
|
||||
}
|
||||
|
||||
def createRemoteRequestProtocolBuilder(
|
||||
actorRef: ActorRef,
|
||||
message: Any,
|
||||
isOneWay: Boolean,
|
||||
senderOption: Option[ActorRef],
|
||||
typedActorInfo: Option[Tuple2[String, String]],
|
||||
actorType: ActorType):
|
||||
RemoteRequestProtocol.Builder = {
|
||||
actorRef: ActorRef,
|
||||
message: Any,
|
||||
isOneWay: Boolean,
|
||||
senderOption: Option[ActorRef],
|
||||
typedActorInfo: Option[Tuple2[String, String]],
|
||||
actorType: ActorType):
|
||||
RemoteRequestProtocol.Builder = {
|
||||
import actorRef._
|
||||
|
||||
val actorInfoBuilder = ActorInfoProtocol.newBuilder
|
||||
|
|
@ -253,12 +255,13 @@ object RemoteActorSerialization {
|
|||
.setTarget(actorClassName)
|
||||
.setTimeout(timeout)
|
||||
|
||||
typedActorInfo.foreach { typedActor =>
|
||||
actorInfoBuilder.setTypedActorInfo(
|
||||
TypedActorInfoProtocol.newBuilder
|
||||
.setInterface(typedActor._1)
|
||||
.setMethod(typedActor._2)
|
||||
.build)
|
||||
typedActorInfo.foreach {
|
||||
typedActor =>
|
||||
actorInfoBuilder.setTypedActorInfo(
|
||||
TypedActorInfoProtocol.newBuilder
|
||||
.setInterface(typedActor._1)
|
||||
.setMethod(typedActor._2)
|
||||
.build)
|
||||
}
|
||||
|
||||
actorType match {
|
||||
|
|
@ -276,10 +279,110 @@ object RemoteActorSerialization {
|
|||
val id = registerSupervisorAsRemoteActor
|
||||
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
||||
|
||||
senderOption.foreach { sender =>
|
||||
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
|
||||
requestBuilder.setSender(toRemoteActorRefProtocol(sender))
|
||||
senderOption.foreach {
|
||||
sender =>
|
||||
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
|
||||
requestBuilder.setSender(toRemoteActorRefProtocol(sender))
|
||||
}
|
||||
requestBuilder
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Module for local typed actor serialization.
|
||||
*/
|
||||
object TypedActorSerialization {
|
||||
|
||||
def fromBinary[T <: Actor, U <: AnyRef](bytes: Array[Byte])(implicit format: Format[T]): U =
|
||||
fromBinaryToLocalTypedActorRef(bytes, format)
|
||||
|
||||
def toBinary[T <: Actor](proxy: AnyRef)(implicit format: Format[T]): Array[Byte] = {
|
||||
toSerializedTypedActorRefProtocol(proxy, format).toByteArray
|
||||
}
|
||||
|
||||
// wrapper for implicits to be used by Java
|
||||
def fromBinaryJ[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U =
|
||||
fromBinary(bytes)(format)
|
||||
|
||||
// wrapper for implicits to be used by Java
|
||||
def toBinaryJ[T <: Actor](a: AnyRef, format: Format[T]): Array[Byte] =
|
||||
toBinary(a)(format)
|
||||
|
||||
private def toSerializedTypedActorRefProtocol[T <: Actor](
|
||||
proxy: AnyRef, format: Format[T]): SerializedTypedActorRefProtocol = {
|
||||
|
||||
val init = AspectInitRegistry.initFor(proxy)
|
||||
if (init == null) throw new IllegalArgumentException("Proxy for typed actor could not be found in AspectInitRegistry.")
|
||||
|
||||
SerializedTypedActorRefProtocol.newBuilder
|
||||
.setActorRef(ActorSerialization.toSerializedActorRefProtocol(init.actorRef, format))
|
||||
.setInterfaceName(init.interfaceClass.getName)
|
||||
.build
|
||||
}
|
||||
|
||||
private def fromBinaryToLocalTypedActorRef[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U =
|
||||
fromProtobufToLocalTypedActorRef(SerializedTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None)
|
||||
|
||||
private def fromProtobufToLocalTypedActorRef[T <: Actor, U <: AnyRef](
|
||||
protocol: SerializedTypedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): U = {
|
||||
Actor.log.debug("Deserializing SerializedTypedActorRefProtocol to LocalActorRef:\n" + protocol)
|
||||
val actorRef = ActorSerialization.fromProtobufToLocalActorRef(protocol.getActorRef, format, loader)
|
||||
val intfClass = toClass(loader, protocol.getInterfaceName)
|
||||
TypedActor.newInstance(intfClass, actorRef).asInstanceOf[U]
|
||||
}
|
||||
|
||||
private[akka] def toClass[U <: AnyRef](loader: Option[ClassLoader], name: String): Class[U] = {
|
||||
val classLoader = loader.getOrElse(getClass.getClassLoader)
|
||||
val clazz = classLoader.loadClass(name)
|
||||
clazz.asInstanceOf[Class[U]]
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Module for remote typed actor serialization.
|
||||
*/
|
||||
object RemoteTypedActorSerialization {
|
||||
/**
|
||||
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
|
||||
*/
|
||||
def fromBinaryToRemoteTypedActorRef[T <: AnyRef](bytes: Array[Byte]): T =
|
||||
fromProtobufToRemoteTypedActorRef(RemoteTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
|
||||
|
||||
/**
|
||||
* Deserializes a byte array (Array[Byte]) into a AW RemoteActorRef proxy.
|
||||
*/
|
||||
def fromBinaryToRemoteTypedActorRef[T <: AnyRef](bytes: Array[Byte], loader: ClassLoader): T =
|
||||
fromProtobufToRemoteTypedActorRef(RemoteTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
|
||||
|
||||
/**
|
||||
* Serialize as AW RemoteActorRef proxy.
|
||||
*/
|
||||
def toBinary[T <: Actor](proxy: AnyRef): Array[Byte] = {
|
||||
toRemoteTypedActorRefProtocol(proxy).toByteArray
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserializes a RemoteTypedActorRefProtocol Protocol Buffers (protobuf) Message into AW RemoteActorRef proxy.
|
||||
*/
|
||||
private[akka] def fromProtobufToRemoteTypedActorRef[T](protocol: RemoteTypedActorRefProtocol, loader: Option[ClassLoader]): T = {
|
||||
Actor.log.debug("Deserializing RemoteTypedActorRefProtocol to AW RemoteActorRef proxy:\n" + protocol)
|
||||
val actorRef = RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getActorRef, loader)
|
||||
val intfClass = TypedActorSerialization.toClass(loader, protocol.getInterfaceName)
|
||||
TypedActor.createProxyForRemoteActorRef(intfClass, actorRef).asInstanceOf[T]
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes the AW TypedActor proxy into a Protocol Buffers (protobuf) Message.
|
||||
*/
|
||||
def toRemoteTypedActorRefProtocol(proxy: AnyRef): RemoteTypedActorRefProtocol = {
|
||||
val init = AspectInitRegistry.initFor(proxy)
|
||||
RemoteTypedActorRefProtocol.newBuilder
|
||||
.setActorRef(RemoteActorSerialization.toRemoteActorRefProtocol(init.actorRef))
|
||||
.setInterfaceName(init.interfaceClass.getName)
|
||||
.build
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue