Issue #595: Pluggable serializers - basic implementation

This commit is contained in:
Debasish Ghosh 2011-06-07 06:36:21 +05:30
parent b600d0cf52
commit 40d1ca6da2
22 changed files with 1071 additions and 480 deletions

View file

@ -23,27 +23,27 @@ import akka.remote.{ RemoteClientSettings, MessageSerializer }
* Module for local actor serialization.
*/
object ActorSerialization {
implicit val defaultSerializer = Format.Default
implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default
def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress)(implicit format: Serializer): ActorRef =
fromBinaryToLocalActorRef(bytes, Some(homeAddress), format)
def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef =
fromBinaryToLocalActorRef(bytes, Some(homeAddress))
def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Serializer): ActorRef =
fromBinaryToLocalActorRef(bytes, None, format)
def fromBinary[T <: Actor](bytes: Array[Byte]): ActorRef =
fromBinaryToLocalActorRef(bytes, None)
def toBinary[T <: Actor](a: ActorRef, serializeMailBox: Boolean = true)(implicit format: Serializer): Array[Byte] =
toSerializedActorRefProtocol(a, format, serializeMailBox).toByteArray
def toBinary[T <: Actor](a: ActorRef, serializeMailBox: Boolean = true): Array[Byte] =
toSerializedActorRefProtocol(a, serializeMailBox).toByteArray
// wrapper for implicits to be used by Java
def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Serializer): ActorRef =
fromBinary(bytes)(format)
def fromBinaryJ[T <: Actor](bytes: Array[Byte]): ActorRef =
fromBinary(bytes)
// wrapper for implicits to be used by Java
def toBinaryJ[T <: Actor](a: ActorRef, format: Serializer, srlMailBox: Boolean = true): Array[Byte] =
toBinary(a, srlMailBox)(format)
def toBinaryJ[T <: Actor](a: ActorRef, srlMailBox: Boolean = true): Array[Byte] =
toBinary(a, srlMailBox)
private[akka] def toSerializedActorRefProtocol[T <: Actor](
actorRef: ActorRef, format: Serializer, serializeMailBox: Boolean = true): SerializedActorRefProtocol = {
actorRef: ActorRef, serializeMailBox: Boolean = true): SerializedActorRefProtocol = {
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
actorRef.lifeCycle match {
case Permanent Some(LifeCycleProtocol.newBuilder.setLifeCycle(LifeCycleType.PERMANENT).build)
@ -84,23 +84,28 @@ object ActorSerialization {
}
actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_))
builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T])))
// builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T])))
Serialization.serialize(actorRef.actor.asInstanceOf[T]) match {
case Right(bytes) builder.setActorInstance(ByteString.copyFrom(bytes))
case Left(exception) throw new Exception("Error serializing : " + actorRef.actor.getClass.getName)
}
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
actorRef.supervisor.foreach(s builder.setSupervisor(RemoteActorSerialization.toRemoteActorRefProtocol(s)))
if (!actorRef.hotswap.isEmpty) builder.setHotswapStack(ByteString.copyFrom(Serializers.Java.toBinary(actorRef.hotswap)))
// if (!actorRef.hotswap.isEmpty) builder.setHotswapStack(ByteString.copyFrom(Serializers.Java.toBinary(actorRef.hotswap)))
if (!actorRef.hotswap.isEmpty) builder.setHotswapStack(ByteString.copyFrom(akka.serialization.JavaSerializer.toBinary(actorRef.hotswap)))
builder.build
}
private def fromBinaryToLocalActorRef[T <: Actor](
bytes: Array[Byte],
homeAddress: Option[InetSocketAddress],
format: Serializer): ActorRef = {
homeAddress: Option[InetSocketAddress]): ActorRef = {
val builder = SerializedActorRefProtocol.newBuilder.mergeFrom(bytes)
fromProtobufToLocalActorRef(builder.build, format, None)
fromProtobufToLocalActorRef(builder.build, None)
}
private[akka] def fromProtobufToLocalActorRef[T <: Actor](
protocol: SerializedActorRefProtocol, format: Serializer, loader: Option[ClassLoader]): ActorRef = {
protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
val lifeCycle =
if (protocol.hasLifeCycle) {
@ -117,9 +122,13 @@ object ActorSerialization {
val hotswap =
try {
format
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[Stack[PartialFunction[Any, Unit]]]))
.asInstanceOf[Stack[PartialFunction[Any, Unit]]]
Serialization.deserialize(protocol.getHotswapStack.toByteArray, classOf[Stack[PartialFunction[Any, Unit]]], loader) match {
case Right(r) r.asInstanceOf[Stack[PartialFunction[Any, Unit]]]
case Left(ex) throw new Exception("Cannot de-serialize hotswapstack")
}
// format
// .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[Stack[PartialFunction[Any, Unit]]]))
// .asInstanceOf[Stack[PartialFunction[Any, Unit]]]
} catch {
case e: Exception Stack[PartialFunction[Any, Unit]]()
}
@ -129,7 +138,11 @@ object ActorSerialization {
val factory = () {
val actorClass = classLoader.loadClass(protocol.getActorClassname)
try {
format.fromBinary(protocol.getActorInstance.toByteArray, Some(actorClass)).asInstanceOf[Actor]
Serialization.deserialize(protocol.getActorInstance.toByteArray, actorClass, loader) match {
case Right(r) r.asInstanceOf[Actor]
case Left(ex) throw new Exception("Cannot de-serialize : " + actorClass)
}
// format.fromBinary(protocol.getActorInstance.toByteArray, Some(actorClass)).asInstanceOf[Actor]
} catch {
case e: Exception actorClass.newInstance.asInstanceOf[Actor]
}
@ -146,11 +159,7 @@ object ActorSerialization {
factory)
val messages = protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteMessageProtocol]]
messages.foreach(message ar ! MessageSerializer.deserialize(message.getMessage))
//if (format.isInstanceOf[SerializerBasedActorFormat[_]] == false)
// format.fromBinary(protocol.getActorInstance.toByteArray, ar.actor.asInstanceOf[T])
//ar
messages.foreach(message ar ! MessageSerializer.deserialize(message.getMessage, Some(classLoader)))
ar
}
}
@ -174,7 +183,7 @@ object RemoteActorSerialization {
*/
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
RemoteActorRef(
Serializers.Java.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress])).asInstanceOf[InetSocketAddress],
JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress],
protocol.getAddress,
protocol.getTimeout,
loader)
@ -194,7 +203,7 @@ object RemoteActorSerialization {
ReflectiveAccess.RemoteModule.configDefaultAddress
}
RemoteActorRefProtocol.newBuilder
.setInetSocketAddress(ByteString.copyFrom(Serializers.Java.toBinary(remoteAddress)))
.setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress)))
.setAddress(actor.address)
.setTimeout(actor.timeout)
.build
@ -230,7 +239,7 @@ object RemoteActorSerialization {
message match {
case Right(message)
messageBuilder.setMessage(MessageSerializer.serialize(message))
messageBuilder.setMessage(MessageSerializer.serialize(message.asInstanceOf[AnyRef]))
case Left(exception)
messageBuilder.setException(ExceptionProtocol.newBuilder
.setClassname(exception.getClass.getName)