2010-08-28 16:48:27 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
|
|
|
|
*/
|
|
|
|
|
|
2010-09-01 19:22:39 +02:00
|
|
|
package se.scalablesolutions.akka.serialization
|
2010-08-28 16:48:27 +02:00
|
|
|
|
2010-09-01 19:22:39 +02:00
|
|
|
import se.scalablesolutions.akka.actor.{Actor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, ActorType}
|
2010-08-28 16:48:27 +02:00
|
|
|
import se.scalablesolutions.akka.stm.global._
|
|
|
|
|
import se.scalablesolutions.akka.stm.TransactionManagement._
|
|
|
|
|
import se.scalablesolutions.akka.stm.TransactionManagement
|
|
|
|
|
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteRequestProtocolIdFactory, MessageSerializer}
|
2010-09-01 19:22:39 +02:00
|
|
|
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
|
|
|
|
|
import ActorTypeProtocol._
|
|
|
|
|
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
|
|
|
|
|
import se.scalablesolutions.akka.config.ScalaConfig._
|
2010-08-28 16:48:27 +02:00
|
|
|
|
|
|
|
|
import com.google.protobuf.ByteString
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Type class definition for Actor Serialization
|
|
|
|
|
*/
|
|
|
|
|
trait FromBinary[T <: Actor] {
|
|
|
|
|
def fromBinary(bytes: Array[Byte], act: T): T
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait ToBinary[T <: Actor] {
|
|
|
|
|
def toBinary(t: T): Array[Byte]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// client needs to implement Format[] for the respective actor
|
|
|
|
|
trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A default implementation for a stateless actor
|
|
|
|
|
*
|
|
|
|
|
* Create a Format object with the client actor as the implementation of the type class
|
|
|
|
|
*
|
|
|
|
|
* <pre>
|
|
|
|
|
* object BinaryFormatMyStatelessActor {
|
|
|
|
|
* implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
|
|
|
|
|
* }
|
|
|
|
|
* </pre>
|
|
|
|
|
*/
|
|
|
|
|
trait StatelessActorFormat[T <: Actor] extends Format[T] {
|
|
|
|
|
def fromBinary(bytes: Array[Byte], act: T) = act
|
|
|
|
|
def toBinary(ac: T) = Array.empty[Byte]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A default implementation of the type class for a Format that specifies a serializer
|
|
|
|
|
*
|
|
|
|
|
* Create a Format object with the client actor as the implementation of the type class and
|
|
|
|
|
* a serializer object
|
|
|
|
|
*
|
|
|
|
|
* <pre>
|
|
|
|
|
* object BinaryFormatMyJavaSerializableActor {
|
|
|
|
|
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
|
|
|
|
|
* val serializer = Serializer.Java
|
|
|
|
|
* }
|
|
|
|
|
* }
|
|
|
|
|
* </pre>
|
|
|
|
|
*/
|
|
|
|
|
trait SerializerBasedActorFormat[T <: Actor] extends Format[T] {
|
|
|
|
|
val serializer: Serializer
|
|
|
|
|
def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T]
|
|
|
|
|
def toBinary(ac: T) = serializer.toBinary(ac)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2010-09-06 14:03:49 +02:00
|
|
|
* Module for local actor serialization.
|
2010-08-28 16:48:27 +02:00
|
|
|
*/
|
|
|
|
|
object ActorSerialization {
|
|
|
|
|
|
|
|
|
|
def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef =
|
|
|
|
|
fromBinaryToLocalActorRef(bytes, format)
|
|
|
|
|
|
|
|
|
|
def toBinary[T <: Actor](a: ActorRef)(implicit format: Format[T]): Array[Byte] =
|
|
|
|
|
toSerializedActorRefProtocol(a, format).toByteArray
|
|
|
|
|
|
|
|
|
|
// wrapper for implicits to be used by Java
|
|
|
|
|
def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef =
|
|
|
|
|
fromBinary(bytes)(format)
|
|
|
|
|
|
|
|
|
|
// wrapper for implicits to be used by Java
|
|
|
|
|
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T]): Array[Byte] =
|
|
|
|
|
toBinary(a)(format)
|
|
|
|
|
|
|
|
|
|
private def toSerializedActorRefProtocol[T <: Actor](
|
|
|
|
|
actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
|
|
|
|
|
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
|
|
|
|
|
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
|
|
|
|
|
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
|
|
|
|
|
case Temporary => builder.setLifeCycle(LifeCycleType.TEMPORARY)
|
|
|
|
|
}
|
|
|
|
|
val builder = LifeCycleProtocol.newBuilder
|
|
|
|
|
actorRef.lifeCycle match {
|
|
|
|
|
case Some(LifeCycle(scope)) =>
|
|
|
|
|
setScope(builder, scope)
|
|
|
|
|
Some(builder.build)
|
|
|
|
|
case None => None
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val originalAddress = AddressProtocol.newBuilder
|
|
|
|
|
.setHostname(actorRef.homeAddress.getHostName)
|
|
|
|
|
.setPort(actorRef.homeAddress.getPort)
|
|
|
|
|
.build
|
|
|
|
|
|
|
|
|
|
val builder = SerializedActorRefProtocol.newBuilder
|
|
|
|
|
.setUuid(actorRef.uuid)
|
|
|
|
|
.setId(actorRef.id)
|
|
|
|
|
.setActorClassname(actorRef.actorClass.getName)
|
|
|
|
|
.setOriginalAddress(originalAddress)
|
|
|
|
|
.setIsTransactor(actorRef.isTransactor)
|
|
|
|
|
.setTimeout(actorRef.timeout)
|
|
|
|
|
|
|
|
|
|
actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_))
|
|
|
|
|
builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T])))
|
|
|
|
|
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
|
|
|
|
|
actorRef.supervisor.foreach(s => builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s)))
|
|
|
|
|
// FIXME: how to serialize the hotswap PartialFunction ??
|
|
|
|
|
//hotswap.foreach(builder.setHotswapStack(_))
|
|
|
|
|
builder.build
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def fromBinaryToLocalActorRef[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef =
|
|
|
|
|
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None)
|
|
|
|
|
|
|
|
|
|
private def fromProtobufToLocalActorRef[T <: Actor](
|
|
|
|
|
protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = {
|
|
|
|
|
Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol)
|
|
|
|
|
|
|
|
|
|
val serializer =
|
|
|
|
|
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
|
|
|
|
|
Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer)
|
|
|
|
|
else None
|
|
|
|
|
|
|
|
|
|
val lifeCycle =
|
|
|
|
|
if (protocol.hasLifeCycle) {
|
|
|
|
|
val lifeCycleProtocol = protocol.getLifeCycle
|
|
|
|
|
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent)
|
|
|
|
|
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary)
|
|
|
|
|
else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
|
|
|
|
|
} else None
|
|
|
|
|
|
|
|
|
|
val supervisor =
|
|
|
|
|
if (protocol.hasSupervisor)
|
|
|
|
|
Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
|
|
|
|
|
else None
|
|
|
|
|
|
|
|
|
|
val hotswap =
|
|
|
|
|
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
|
|
|
|
|
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
|
|
|
|
|
.asInstanceOf[PartialFunction[Any, Unit]])
|
|
|
|
|
else None
|
|
|
|
|
|
|
|
|
|
val classLoader = loader.getOrElse(getClass.getClassLoader)
|
2010-08-28 19:27:42 +02:00
|
|
|
|
2010-08-28 16:48:27 +02:00
|
|
|
val factory = () => {
|
|
|
|
|
val actorClass = classLoader.loadClass(protocol.getActorClassname)
|
|
|
|
|
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
|
|
|
|
|
format.asInstanceOf[SerializerBasedActorFormat[_]].serializer.fromBinary(
|
|
|
|
|
protocol.getActorInstance.toByteArray, Some(actorClass)).asInstanceOf[Actor]
|
|
|
|
|
else actorClass.newInstance.asInstanceOf[Actor]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val ar = new LocalActorRef(
|
|
|
|
|
protocol.getUuid,
|
|
|
|
|
protocol.getId,
|
|
|
|
|
protocol.getOriginalAddress.getHostname,
|
|
|
|
|
protocol.getOriginalAddress.getPort,
|
|
|
|
|
if (protocol.hasIsTransactor) protocol.getIsTransactor else false,
|
|
|
|
|
if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT,
|
|
|
|
|
if (protocol.hasReceiveTimeout) Some(protocol.getReceiveTimeout) else None,
|
|
|
|
|
lifeCycle,
|
|
|
|
|
supervisor,
|
|
|
|
|
hotswap,
|
|
|
|
|
classLoader, // TODO: should we fall back to getClass.getClassLoader?
|
|
|
|
|
factory)
|
|
|
|
|
|
|
|
|
|
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]]
|
|
|
|
|
messages.foreach(message => ar ! MessageSerializer.deserialize(message.getMessage))
|
2010-08-28 19:27:42 +02:00
|
|
|
|
2010-08-28 16:48:27 +02:00
|
|
|
if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false)
|
|
|
|
|
format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T])
|
|
|
|
|
ar
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
object RemoteActorSerialization {
|
|
|
|
|
/**
|
|
|
|
|
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
|
|
|
|
|
*/
|
|
|
|
|
def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
|
|
|
|
|
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
|
|
|
|
|
*/
|
|
|
|
|
def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
|
|
|
|
|
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
|
|
|
|
|
*/
|
|
|
|
|
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
|
|
|
|
|
Actor.log.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n" + protocol)
|
|
|
|
|
RemoteActorRef(
|
|
|
|
|
protocol.getUuid,
|
|
|
|
|
protocol.getActorClassname,
|
|
|
|
|
protocol.getHomeAddress.getHostname,
|
|
|
|
|
protocol.getHomeAddress.getPort,
|
|
|
|
|
protocol.getTimeout,
|
|
|
|
|
loader)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
|
|
|
|
|
*/
|
|
|
|
|
def toRemoteActorRefProtocol(ar: ActorRef): RemoteActorRefProtocol = {
|
|
|
|
|
import ar._
|
|
|
|
|
val host = homeAddress.getHostName
|
|
|
|
|
val port = homeAddress.getPort
|
|
|
|
|
|
|
|
|
|
if (!registeredInRemoteNodeDuringSerialization) {
|
|
|
|
|
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
|
|
|
|
|
RemoteServer.getOrCreateServer(homeAddress)
|
|
|
|
|
RemoteServer.registerActor(homeAddress, uuid, ar)
|
|
|
|
|
registeredInRemoteNodeDuringSerialization = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RemoteActorRefProtocol.newBuilder
|
2010-09-16 13:50:57 +02:00
|
|
|
.setUuid(uuid)
|
2010-08-28 16:48:27 +02:00
|
|
|
.setActorClassname(actorClass.getName)
|
|
|
|
|
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
|
|
|
|
|
.setTimeout(timeout)
|
|
|
|
|
.build
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def createRemoteRequestProtocolBuilder(
|
|
|
|
|
actorRef: ActorRef,
|
|
|
|
|
message: Any,
|
|
|
|
|
isOneWay: Boolean,
|
|
|
|
|
senderOption: Option[ActorRef],
|
|
|
|
|
typedActorInfo: Option[Tuple2[String, String]],
|
|
|
|
|
actorType: ActorType):
|
|
|
|
|
RemoteRequestProtocol.Builder = {
|
|
|
|
|
import actorRef._
|
|
|
|
|
|
|
|
|
|
val actorInfoBuilder = ActorInfoProtocol.newBuilder
|
2010-09-16 13:50:57 +02:00
|
|
|
.setUuid(uuid)
|
|
|
|
|
.setId(actorRef.id)
|
2010-08-28 16:48:27 +02:00
|
|
|
.setTarget(actorClassName)
|
|
|
|
|
.setTimeout(timeout)
|
|
|
|
|
|
2010-08-28 19:27:42 +02:00
|
|
|
typedActorInfo.foreach { typedActor =>
|
2010-08-28 16:48:27 +02:00
|
|
|
actorInfoBuilder.setTypedActorInfo(
|
|
|
|
|
TypedActorInfoProtocol.newBuilder
|
|
|
|
|
.setInterface(typedActor._1)
|
|
|
|
|
.setMethod(typedActor._2)
|
|
|
|
|
.build)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
actorType match {
|
|
|
|
|
case ActorType.ScalaActor => actorInfoBuilder.setActorType(SCALA_ACTOR)
|
|
|
|
|
case ActorType.TypedActor => actorInfoBuilder.setActorType(TYPED_ACTOR)
|
|
|
|
|
}
|
|
|
|
|
val actorInfo = actorInfoBuilder.build
|
|
|
|
|
|
|
|
|
|
val requestBuilder = RemoteRequestProtocol.newBuilder
|
|
|
|
|
.setId(RemoteRequestProtocolIdFactory.nextId)
|
|
|
|
|
.setMessage(MessageSerializer.serialize(message))
|
|
|
|
|
.setActorInfo(actorInfo)
|
|
|
|
|
.setIsOneWay(isOneWay)
|
|
|
|
|
|
|
|
|
|
val id = registerSupervisorAsRemoteActor
|
|
|
|
|
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
|
|
|
|
|
|
|
|
|
senderOption.foreach { sender =>
|
|
|
|
|
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
|
|
|
|
|
requestBuilder.setSender(toRemoteActorRefProtocol(sender))
|
|
|
|
|
}
|
|
|
|
|
requestBuilder
|
|
|
|
|
}
|
|
|
|
|
}
|