Added serialization of actor mailbox

This commit is contained in:
Jonas Bonér 2010-06-24 08:48:48 +02:00
parent f1f3c92112
commit c3d723ca61
10 changed files with 897 additions and 611 deletions

View file

@ -39,6 +39,16 @@ message SerializedActorRefProtocol {
optional LifeCycleProtocol lifeCycle = 9; optional LifeCycleProtocol lifeCycle = 9;
optional RemoteActorRefProtocol supervisor = 10; optional RemoteActorRefProtocol supervisor = 10;
optional bytes hotswapStack = 11; optional bytes hotswapStack = 11;
repeated RemoteRequestProtocol messages = 12;
}
/**
* Defines a message.
*/
message MessageProtocol {
required SerializationSchemeType serializationScheme = 1;
required bytes message = 2;
optional bytes messageManifest = 3;
} }
/** /**
@ -46,18 +56,16 @@ message SerializedActorRefProtocol {
*/ */
message RemoteRequestProtocol { message RemoteRequestProtocol {
required uint64 id = 1; required uint64 id = 1;
required SerializationSchemeType serializationScheme = 2; required MessageProtocol message = 2;
required bytes message = 3; optional string method = 3;
optional bytes messageManifest = 4; required string target = 4;
optional string method = 5; required string uuid = 5;
required string target = 6; required uint64 timeout = 6;
required string uuid = 7; optional string supervisorUuid = 7;
required uint64 timeout = 8; required bool isActor = 8;
optional string supervisorUuid = 9; required bool isOneWay = 9;
required bool isActor = 10; required bool isEscaped = 10;
required bool isOneWay = 11; optional RemoteActorRefProtocol sender = 11;
required bool isEscaped = 12;
optional RemoteActorRefProtocol sender = 13;
} }
/** /**
@ -65,13 +73,11 @@ message RemoteRequestProtocol {
*/ */
message RemoteReplyProtocol { message RemoteReplyProtocol {
required uint64 id = 1; required uint64 id = 1;
optional SerializationSchemeType serializationScheme = 2; optional MessageProtocol message = 2;
optional bytes message = 3; optional ExceptionProtocol exception = 3;
optional bytes messageManifest = 4; optional string supervisorUuid = 4;
optional ExceptionProtocol exception = 5; required bool isActor = 5;
optional string supervisorUuid = 6; required bool isSuccessful = 6;
required bool isActor = 7;
required bool isSuccessful = 8;
} }
/** /**

View file

@ -7,7 +7,7 @@ package se.scalablesolutions.akka.actor
import Actor._ import Actor._
import se.scalablesolutions.akka.config.FaultHandlingStrategy import se.scalablesolutions.akka.config.FaultHandlingStrategy
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestProtocolIdFactory} import se.scalablesolutions.akka.remote.{MessageSerializer, RemoteClient, RemoteRequestProtocolIdFactory}
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future, CompletableFuture} import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future, CompletableFuture}
import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.serialization.Serializer
@ -563,6 +563,7 @@ private[akka] sealed class ActiveObjectAspect {
val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues) val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues)
val requestBuilder = RemoteRequestProtocol.newBuilder val requestBuilder = RemoteRequestProtocol.newBuilder
.setId(RemoteRequestProtocolIdFactory.nextId) .setId(RemoteRequestProtocolIdFactory.nextId)
.setMessage(MessageSerializer.serialize(message))
.setMethod(rtti.getMethod.getName) .setMethod(rtti.getMethod.getName)
.setTarget(target.getName) .setTarget(target.getName)
.setUuid(actorRef.uuid) .setUuid(actorRef.uuid)
@ -570,7 +571,6 @@ private[akka] sealed class ActiveObjectAspect {
.setIsActor(false) .setIsActor(false)
.setIsOneWay(isOneWay) .setIsOneWay(isOneWay)
.setIsEscaped(false) .setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = actorRef.registerSupervisorAsRemoteActor val id = actorRef.registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val remoteMessage = requestBuilder.build val remoteMessage = requestBuilder.build

View file

@ -12,7 +12,7 @@ import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.TransactionManagement import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory} import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, MessageSerializer, RemoteRequestProtocolIdFactory}
import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard} import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard}
@ -146,7 +146,8 @@ object ActorRef {
supervisor, supervisor,
hotswap, hotswap,
loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader? loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader?
serializer) serializer,
protocol.getMessagesList.toArray.toList.asInstanceOf[List[RemoteRequestProtocol]])
} }
} }
@ -489,6 +490,11 @@ trait ActorRef extends TransactionManagement {
*/ */
def actorClass: Class[_ <: Actor] def actorClass: Class[_ <: Actor]
/**
* Returns the class name for the Actor instance that is managed by the ActorRef.
*/
def actorClassName: String
/** /**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started. * Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/ */
@ -639,6 +645,28 @@ trait ActorRef extends TransactionManagement {
*/ */
def shutdownLinkedActors: Unit def shutdownLinkedActors: Unit
protected def createRemoteRequestProtocolBuilder(
message: Any, isOneWay: Boolean, senderOption: Option[ActorRef]): RemoteRequestProtocol.Builder = {
val protocol = RemoteRequestProtocol.newBuilder
.setId(RemoteRequestProtocolIdFactory.nextId)
.setMessage(MessageSerializer.serialize(message))
.setTarget(actorClassName)
.setTimeout(timeout)
.setUuid(uuid)
.setIsActor(true)
.setIsOneWay(isOneWay)
.setIsEscaped(false)
val id = registerSupervisorAsRemoteActor
if (id.isDefined) protocol.setSupervisorUuid(id.get)
senderOption.foreach { sender =>
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
protocol.setSender(sender.toRemoteActorRefProtocol)
}
protocol
}
protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol protected[akka] def toRemoteActorRefProtocol: RemoteActorRefProtocol
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol
@ -682,13 +710,6 @@ trait ActorRef extends TransactionManagement {
} }
override def toString = "Actor[" + id + ":" + uuid + "]" override def toString = "Actor[" + id + ":" + uuid + "]"
protected def processSender(senderOption: Option[ActorRef], requestBuilder: RemoteRequestProtocol.Builder) = {
senderOption.foreach { sender =>
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
requestBuilder.setSender(sender.toRemoteActorRefProtocol)
}
}
} }
/** /**
@ -719,7 +740,8 @@ sealed class LocalActorRef private[akka](
__supervisor: Option[ActorRef], __supervisor: Option[ActorRef],
__hotswap: Option[PartialFunction[Any, Unit]], __hotswap: Option[PartialFunction[Any, Unit]],
__loader: ClassLoader, __loader: ClassLoader,
__serializer: Option[Serializer]) = { __serializer: Option[Serializer],
__messages: List[RemoteRequestProtocol]) = {
this(() => { this(() => {
val actorClass = __loader.loadClass(__actorClassName) val actorClass = __loader.loadClass(__actorClassName)
val actorInstance = actorClass.newInstance val actorInstance = actorClass.newInstance
@ -752,6 +774,8 @@ sealed class LocalActorRef private[akka](
actorSelfFields._1.set(actor, this) actorSelfFields._1.set(actor, this)
actorSelfFields._2.set(actor, Some(this)) actorSelfFields._2.set(actor, Some(this))
actorSelfFields._3.set(actor, Some(this)) actorSelfFields._3.set(actor, Some(this))
start
__messages.foreach(message => this ! MessageSerializer.deserialize(message.getMessage))
ActorRegistry.register(this) ActorRegistry.register(this)
} }
@ -799,6 +823,7 @@ sealed class LocalActorRef private[akka](
"Can't serialize an ActorRef using SerializedActorRefProtocol" + "Can't serialize an ActorRef using SerializedActorRefProtocol" +
"\nthat is wrapping an Actor that is not mixing in the SerializableActor trait") "\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
stop // stop actor since it can not be used any more since we have serialized it and taken all messagess with us
val lifeCycleProtocol: Option[LifeCycleProtocol] = { val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match { def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT) case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
@ -839,6 +864,12 @@ sealed class LocalActorRef private[akka](
supervisor.foreach(s => builder.setSupervisor(s.toRemoteActorRefProtocol)) supervisor.foreach(s => builder.setSupervisor(s.toRemoteActorRefProtocol))
// FIXME: how to serialize the hotswap PartialFunction ?? // FIXME: how to serialize the hotswap PartialFunction ??
//hotswap.foreach(builder.setHotswapStack(_)) //hotswap.foreach(builder.setHotswapStack(_))
var message = mailbox.poll
while (message != null) {
builder.addMessages(createRemoteRequestProtocolBuilder(
message.message, message.senderFuture.isEmpty, message.sender))
message = mailbox.poll
}
builder.build builder.build
} }
@ -862,6 +893,11 @@ sealed class LocalActorRef private[akka](
*/ */
def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]] def actorClass: Class[_ <: Actor] = actor.getClass.asInstanceOf[Class[_ <: Actor]]
/**
* Returns the class name for the Actor instance that is managed by the ActorRef.
*/
def actorClassName: String = actorClass.getName
/** /**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started. * Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/ */
@ -1141,21 +1177,8 @@ sealed class LocalActorRef private[akka](
joinTransaction(message) joinTransaction(message)
if (remoteAddress.isDefined) { if (remoteAddress.isDefined) {
val requestBuilder = RemoteRequestProtocol.newBuilder RemoteClient.clientFor(remoteAddress.get).send[Any](
.setId(RemoteRequestProtocolIdFactory.nextId) createRemoteRequestProtocolBuilder(message, true, senderOption).build, None)
.setTarget(actorClass.getName)
.setTimeout(timeout)
.setUuid(uuid)
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
processSender(senderOption, requestBuilder)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(remoteAddress.get).send[Any](requestBuilder.build, None)
} else { } else {
val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get) val invocation = new MessageInvocation(this, message, senderOption, None, transactionSet.get)
if (dispatcher.usesActorMailbox) { if (dispatcher.usesActorMailbox) {
@ -1173,22 +1196,8 @@ sealed class LocalActorRef private[akka](
joinTransaction(message) joinTransaction(message)
if (remoteAddress.isDefined) { if (remoteAddress.isDefined) {
val requestBuilder = RemoteRequestProtocol.newBuilder val future = RemoteClient.clientFor(remoteAddress.get).send(
.setId(RemoteRequestProtocolIdFactory.nextId) createRemoteRequestProtocolBuilder(message, false, senderOption).build, senderFuture)
.setTarget(actorClass.getName)
.setTimeout(timeout)
.setUuid(uuid)
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
//senderOption.foreach(sender => requestBuilder.setSender(sender.toRemoteActorRefProtocol))
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val future = RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build, senderFuture)
if (future.isDefined) future.get if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString) else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else { } else {
@ -1441,17 +1450,7 @@ private[akka] case class RemoteActorRef private[akka] (
lazy val remoteClient = RemoteClient.clientFor(hostname, port, loader) lazy val remoteClient = RemoteClient.clientFor(hostname, port, loader)
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = {
val requestBuilder = RemoteRequestProtocol.newBuilder remoteClient.send[Any](createRemoteRequestProtocolBuilder(message, true, senderOption).build, None)
.setId(RemoteRequestProtocolIdFactory.nextId)
.setTarget(className)
.setTimeout(timeout)
.setUuid(uuid)
.setIsActor(true)
.setIsOneWay(true)
.setIsEscaped(false)
processSender(senderOption, requestBuilder)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
remoteClient.send[Any](requestBuilder.build, None)
} }
def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
@ -1459,16 +1458,7 @@ private[akka] case class RemoteActorRef private[akka] (
timeout: Long, timeout: Long,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val requestBuilder = RemoteRequestProtocol.newBuilder val future = remoteClient.send(createRemoteRequestProtocolBuilder(message, false, senderOption).build, senderFuture)
.setId(RemoteRequestProtocolIdFactory.nextId)
.setTarget(className)
.setTimeout(timeout)
.setUuid(uuid)
.setIsActor(true)
.setIsOneWay(false)
.setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val future = remoteClient.send(requestBuilder.build, senderFuture)
if (future.isDefined) future.get if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString) else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} }
@ -1483,9 +1473,16 @@ private[akka] case class RemoteActorRef private[akka] (
_isShutDown = true _isShutDown = true
} }
/**
* Returns the class name for the Actor instance that is managed by the ActorRef.
*/
def actorClassName: String = className
protected[akka] def registerSupervisorAsRemoteActor: Option[String] = None
// ==== NOT SUPPORTED ==== // ==== NOT SUPPORTED ====
def toBinary: Array[Byte] = unsupported
def actorClass: Class[_ <: Actor] = unsupported def actorClass: Class[_ <: Actor] = unsupported
def toBinary: Array[Byte] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported def dispatcher: MessageDispatcher = unsupported
def makeTransactionRequired: Unit = unsupported def makeTransactionRequired: Unit = unsupported
@ -1512,7 +1509,6 @@ private[akka] case class RemoteActorRef private[akka] (
protected[akka] def restart(reason: Throwable): Unit = unsupported protected[akka] def restart(reason: Throwable): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable): Unit = unsupported protected[akka] def restartLinkedActors(reason: Throwable): Unit = unsupported
protected[akka] def registerSupervisorAsRemoteActor: Option[String] = unsupported
protected[akka] def linkedActors: JMap[String, ActorRef] = unsupported protected[akka] def linkedActors: JMap[String, ActorRef] = unsupported
protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported

View file

@ -0,0 +1,102 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.serialization.{Serializer, Serializable}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import com.google.protobuf.{Message, ByteString}
object MessageSerializer {
private var SERIALIZER_JAVA: Serializer.Java = Serializer.Java
private var SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON
private var SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON
private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
def setClassLoader(cl: ClassLoader) = {
SERIALIZER_JAVA.classLoader = Some(cl)
SERIALIZER_JAVA_JSON.classLoader = Some(cl)
SERIALIZER_SCALA_JSON.classLoader = Some(cl)
SERIALIZER_SBINARY.classLoader = Some(cl)
}
def deserialize(messageProtocol: MessageProtocol): Any = {
messageProtocol.getSerializationScheme match {
case SerializationSchemeType.JAVA =>
unbox(SERIALIZER_JAVA.fromBinary(messageProtocol.getMessage.toByteArray, None))
case SerializationSchemeType.SBINARY =>
val classToLoad = new String(messageProtocol.getMessageManifest.toByteArray)
val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad)
else Class.forName(classToLoad)
val renderer = clazz.newInstance.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
renderer.fromBytes(messageProtocol.getMessage.toByteArray)
case SerializationSchemeType.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(messageProtocol.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.fromBinary(messageProtocol.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeType.JAVA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(messageProtocol.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.fromBinary(messageProtocol.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeType.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.fromBinary(messageProtocol.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.fromBinary(messageProtocol.getMessage.toByteArray, Some(messageClass))
}
}
def serialize(message: Any): MessageProtocol = {
val builder = MessageProtocol.newBuilder
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setSerializationScheme(SerializationSchemeType.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setSerializationScheme(SerializationSchemeType.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(serializable.getClass)))
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
builder.setSerializationScheme(SerializationSchemeType.SCALA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
val serializable = message.asInstanceOf[Serializable.JavaJSON]
builder.setSerializationScheme(SerializationSchemeType.JAVA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setSerializationScheme(SerializationSchemeType.JAVA)
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(box(message))))
}
builder.build
}
private def box(value: Any): AnyRef = value match {
case value: Boolean => new java.lang.Boolean(value)
case value: Char => new java.lang.Character(value)
case value: Short => new java.lang.Short(value)
case value: Int => new java.lang.Integer(value)
case value: Long => new java.lang.Long(value)
case value: Float => new java.lang.Float(value)
case value: Double => new java.lang.Double(value)
case value: Byte => new java.lang.Byte(value)
case value => value.asInstanceOf[AnyRef]
}
private def unbox(value: AnyRef): Any = value match {
case value: java.lang.Boolean => value.booleanValue
case value: java.lang.Character => value.charValue
case value: java.lang.Short => value.shortValue
case value: java.lang.Integer => value.intValue
case value: java.lang.Long => value.longValue
case value: java.lang.Float => value.floatValue
case value: java.lang.Double => value.doubleValue
case value: java.lang.Byte => value.byteValue
case value => value
}
}

View file

@ -104,7 +104,7 @@ object RemoteClient extends Logging {
val hostname = address.getHostName val hostname = address.getHostName
val port = address.getPort val port = address.getPort
val hash = hostname + ':' + port val hash = hostname + ':' + port
loader.foreach(RemoteProtocolBuilder.setClassLoader(_)) loader.foreach(MessageSerializer.setClassLoader(_))
if (remoteClients.contains(hash)) remoteClients(hash) if (remoteClients.contains(hash)) remoteClients(hash)
else { else {
val client = new RemoteClient(hostname, port, loader) val client = new RemoteClient(hostname, port, loader)
@ -297,7 +297,7 @@ class RemoteClientHandler(val name: String,
log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString) log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString)
val future = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]] val future = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]]
if (reply.getIsSuccessful) { if (reply.getIsSuccessful) {
val message = RemoteProtocolBuilder.getMessage(reply) val message = MessageSerializer.deserialize(reply.getMessage)
future.completeWithResult(message) future.completeWithResult(message)
} else { } else {
if (reply.hasSupervisorUuid()) { if (reply.hasSupervisorUuid()) {

View file

@ -1,150 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.serialization.{Serializer, Serializable}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import com.google.protobuf.{Message, ByteString}
object RemoteProtocolBuilder {
private var SERIALIZER_JAVA: Serializer.Java = Serializer.Java
private var SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON
private var SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON
private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
def setClassLoader(cl: ClassLoader) = {
SERIALIZER_JAVA.classLoader = Some(cl)
SERIALIZER_JAVA_JSON.classLoader = Some(cl)
SERIALIZER_SCALA_JSON.classLoader = Some(cl)
SERIALIZER_SBINARY.classLoader = Some(cl)
}
def getMessage(request: RemoteRequestProtocol): Any = {
request.getSerializationScheme match {
case SerializationSchemeType.JAVA =>
unbox(SERIALIZER_JAVA.fromBinary(request.getMessage.toByteArray, None))
case SerializationSchemeType.SBINARY =>
val classToLoad = new String(request.getMessageManifest.toByteArray)
val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad)
else Class.forName(classToLoad)
val renderer = clazz.newInstance.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
renderer.fromBytes(request.getMessage.toByteArray)
case SerializationSchemeType.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.fromBinary(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeType.JAVA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.fromBinary(request.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeType.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.fromBinary(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.fromBinary(request.getMessage.toByteArray, Some(messageClass))
}
}
def getMessage(reply: RemoteReplyProtocol): Any = {
reply.getSerializationScheme match {
case SerializationSchemeType.JAVA =>
unbox(SERIALIZER_JAVA.fromBinary(reply.getMessage.toByteArray, None))
case SerializationSchemeType.SBINARY =>
val classToLoad = new String(reply.getMessageManifest.toByteArray)
val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad)
else Class.forName(classToLoad)
val renderer = clazz.newInstance.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
renderer.fromBytes(reply.getMessage.toByteArray)
case SerializationSchemeType.SCALA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_SCALA_JSON.fromBinary(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeType.JAVA_JSON =>
val manifest = SERIALIZER_JAVA.fromBinary(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
SERIALIZER_JAVA_JSON.fromBinary(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
case SerializationSchemeType.PROTOBUF =>
val messageClass = SERIALIZER_JAVA.fromBinary(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
SERIALIZER_PROTOBUF.fromBinary(reply.getMessage.toByteArray, Some(messageClass))
}
}
def setMessage(message: Any, builder: RemoteRequestProtocol.Builder) = {
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setSerializationScheme(SerializationSchemeType.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setSerializationScheme(SerializationSchemeType.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(serializable.getClass)))
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
builder.setSerializationScheme(SerializationSchemeType.SCALA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
val serializable = message.asInstanceOf[Serializable.JavaJSON]
builder.setSerializationScheme(SerializationSchemeType.JAVA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setSerializationScheme(SerializationSchemeType.JAVA)
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(box(message))))
}
}
def setMessage(message: Any, builder: RemoteReplyProtocol.Builder) = {
if (message.isInstanceOf[Serializable.SBinary[_]]) {
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
builder.setSerializationScheme(SerializationSchemeType.SBINARY)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Message]) {
val serializable = message.asInstanceOf[Message]
builder.setSerializationScheme(SerializationSchemeType.PROTOBUF)
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(serializable.getClass)))
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
builder.setSerializationScheme(SerializationSchemeType.SCALA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
val serializable = message.asInstanceOf[Serializable.JavaJSON]
builder.setSerializationScheme(SerializationSchemeType.JAVA_JSON)
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
} else {
// default, e.g. if no protocol used explicitly then use Java serialization
builder.setSerializationScheme(SerializationSchemeType.JAVA)
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.toBinary(box(message))))
}
}
private def box(value: Any): AnyRef = value match {
case value: Boolean => new java.lang.Boolean(value)
case value: Char => new java.lang.Character(value)
case value: Short => new java.lang.Short(value)
case value: Int => new java.lang.Integer(value)
case value: Long => new java.lang.Long(value)
case value: Float => new java.lang.Float(value)
case value: Double => new java.lang.Double(value)
case value: Byte => new java.lang.Byte(value)
case value => value.asInstanceOf[AnyRef]
}
private def unbox(value: AnyRef): Any = value match {
case value: java.lang.Boolean => value.booleanValue
case value: java.lang.Character => value.charValue
case value: java.lang.Short => value.shortValue
case value: java.lang.Integer => value.intValue
case value: java.lang.Long => value.longValue
case value: java.lang.Float => value.floatValue
case value: java.lang.Double => value.doubleValue
case value: java.lang.Byte => value.byteValue
case value => value
}
}

View file

@ -321,7 +321,7 @@ class RemoteServerHandler(
val activeObjects: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging { val activeObjects: JMap[String, AnyRef]) extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
applicationLoader.foreach(RemoteProtocolBuilder.setClassLoader(_)) applicationLoader.foreach(MessageSerializer.setClassLoader(_))
/** /**
* ChannelOpen overridden to store open channels for a clean shutdown of a RemoteServer. * ChannelOpen overridden to store open channels for a clean shutdown of a RemoteServer.
@ -363,7 +363,7 @@ class RemoteServerHandler(
log.debug("Dispatching to remote actor [%s:%s]", request.getTarget, request.getUuid) log.debug("Dispatching to remote actor [%s:%s]", request.getTarget, request.getUuid)
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout) val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorRef.start actorRef.start
val message = RemoteProtocolBuilder.getMessage(request) val message = MessageSerializer.deserialize(request.getMessage)
val sender = val sender =
if (request.hasSender) Some(ActorRef.fromProtobufToRemoteActorRef(request.getSender, applicationLoader)) if (request.hasSender) Some(ActorRef.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
else None else None
@ -375,9 +375,9 @@ class RemoteServerHandler(
log.debug("Returning result from actor invocation [%s]", result) log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId) .setId(request.getId)
.setMessage(MessageSerializer.serialize(result))
.setIsSuccessful(true) .setIsSuccessful(true)
.setIsActor(true) .setIsActor(true)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build val replyMessage = replyBuilder.build
channel.write(replyMessage) channel.write(replyMessage)
@ -400,7 +400,7 @@ class RemoteServerHandler(
log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget) log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget)
val activeObject = createActiveObject(request.getTarget, request.getTimeout) val activeObject = createActiveObject(request.getTarget, request.getTimeout)
val args = RemoteProtocolBuilder.getMessage(request).asInstanceOf[Array[AnyRef]].toList val args = MessageSerializer.deserialize(request.getMessage).asInstanceOf[Array[AnyRef]].toList
val argClasses = args.map(_.getClass) val argClasses = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout) val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout)
@ -413,9 +413,9 @@ class RemoteServerHandler(
log.debug("Returning result from remote active object invocation [%s]", result) log.debug("Returning result from remote active object invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId) .setId(request.getId)
.setMessage(MessageSerializer.serialize(result))
.setIsSuccessful(true) .setIsSuccessful(true)
.setIsActor(false) .setIsActor(false)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build val replyMessage = replyBuilder.build
channel.write(replyMessage) channel.write(replyMessage)

View file

@ -51,6 +51,23 @@ class SerializableActorSpec extends
actor2.start actor2.start
(actor2 !! "hello").getOrElse("_") should equal("world") (actor2 !! "hello").getOrElse("_") should equal("world")
} }
it("should be able to serialize and deserialize a StatelessSerializableTestActorWithMessagesInMailbox") {
val actor1 = actorOf[StatelessSerializableTestActorWithMessagesInMailbox].start
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
val actor2 = ActorRef.fromBinaryToLocalActorRef(actor1.toBinary)
Thread.sleep(1000)
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
}
} }
} }
@ -83,3 +100,13 @@ class ProtobufSerializableTestActor extends ProtobufSerializableActor[ProtobufPr
self.reply("world " + count) self.reply("world " + count)
} }
} }
class StatelessSerializableTestActorWithMessagesInMailbox extends StatelessSerializableActor {
def receive = {
case "hello" =>
println("# messages in mailbox " + self.mailbox.size)
Thread.sleep(500)
case "hello-reply" => self.reply("world")
}
}

View file

@ -100,7 +100,7 @@ class StmSpec extends
} }
} }
} }
/*
describe("Transactor") { describe("Transactor") {
it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multiple times in a row") { it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multiple times in a row") {
import GlobalTransactionVectorTestActor._ import GlobalTransactionVectorTestActor._
@ -121,51 +121,6 @@ class StmSpec extends
size4 should equal(3) size4 should equal(3)
} }
} }
/*
describe("Multiverse API") {
it("should blablabla") {
import org.multiverse.api.programmatic._
// import org.multiverse.api._
import org.multiverse.templates._
import java.util.concurrent.atomic._
import se.scalablesolutions.akka.stm.Ref
import org.multiverse.api.{GlobalStmInstance, ThreadLocalTransaction, Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.commitbarriers._
def createRef[T]: ProgrammaticReference[T] = GlobalStmInstance
.getGlobalStmInstance
.getProgrammaticReferenceFactoryBuilder
.build
.atomicCreateReference(null.asInstanceOf[T])
val ref1 = Ref(0)//createRef[Int]
val ref2 = Ref(0)//createRef[Int]
val committedCount = new AtomicInteger
val abortedCount = new AtomicInteger
val barrierHolder = new AtomicReference[CountDownCommitBarrier]
val template = new TransactionTemplate[Int]() {
override def onStart(tx: MultiverseTransaction) = barrierHolder.set(new CountDownCommitBarrier(1))
override def execute(tx: MultiverseTransaction): Int = {
ref1.swap(ref1.get.get + 1)
ref2.swap(ref2.get.get + 1)
barrierHolder.get.joinCommit(tx)
null.asInstanceOf[Int]
}
override def onPostCommit = committedCount.incrementAndGet
override def onPostAbort = abortedCount.incrementAndGet
}
template.execute
ref1.get.get should equal(1)
ref2.get.get should equal(1)
committedCount.get should equal(1)
abortedCount.get should equal(2)
}
}
*/ */
} }
@ -194,6 +149,7 @@ class GlobalTransactionVectorTestActor extends Actor {
class NestedTransactorLevelOneActor extends Actor { class NestedTransactorLevelOneActor extends Actor {
import GlobalTransactionVectorTestActor._ import GlobalTransactionVectorTestActor._
private val nested = actorOf[NestedTransactorLevelTwoActor].start private val nested = actorOf[NestedTransactorLevelTwoActor].start
self.timeout = 10000
def receive = { def receive = {
case add @ Add(_) => case add @ Add(_) =>
@ -210,6 +166,7 @@ class NestedTransactorLevelOneActor extends Actor {
class NestedTransactorLevelTwoActor extends Transactor { class NestedTransactorLevelTwoActor extends Transactor {
import GlobalTransactionVectorTestActor._ import GlobalTransactionVectorTestActor._
private val ref = Ref(0) private val ref = Ref(0)
self.timeout = 10000
def receive = { def receive = {
case Add(value) => case Add(value) =>