Upgraded existing code to new remote protocol, all tests pass
This commit is contained in:
parent
27ae559068
commit
16671dc5b7
10 changed files with 3747 additions and 2037 deletions
File diff suppressed because it is too large
Load diff
|
|
@ -1,46 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.remote.protobuf;
|
||||
|
||||
/*
|
||||
Compile with:
|
||||
cd ./akka-core/src/main/java
|
||||
protoc se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto --java_out .
|
||||
*/
|
||||
|
||||
message ActorRefProtocol {
|
||||
required string uuid = 1;
|
||||
required string actorClassName = 2;
|
||||
required string sourceHostname = 3;
|
||||
required uint32 sourcePort = 4;
|
||||
required uint64 timeout = 5;
|
||||
}
|
||||
|
||||
message RemoteRequestProtocol {
|
||||
required uint64 id = 1;
|
||||
required uint32 protocol = 2;
|
||||
required bytes message = 3;
|
||||
optional bytes messageManifest = 4;
|
||||
optional string method = 5;
|
||||
required string target = 6;
|
||||
required string uuid = 7;
|
||||
required uint64 timeout = 8;
|
||||
optional string supervisorUuid = 9;
|
||||
required bool isActor = 10;
|
||||
required bool isOneWay = 11;
|
||||
required bool isEscaped = 12;
|
||||
optional ActorRefProtocol sender = 13;
|
||||
}
|
||||
|
||||
message RemoteReplyProtocol {
|
||||
required uint64 id = 1;
|
||||
optional uint32 protocol = 2;
|
||||
optional bytes message = 3;
|
||||
optional bytes messageManifest = 4;
|
||||
optional string exception = 5;
|
||||
optional string supervisorUuid = 6;
|
||||
required bool isActor = 7;
|
||||
required bool isSuccessful = 8;
|
||||
}
|
||||
File diff suppressed because it is too large
Load diff
118
akka-core/src/main/protocol/RemoteProtocol.proto
Normal file
118
akka-core/src/main/protocol/RemoteProtocol.proto
Normal file
|
|
@ -0,0 +1,118 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
option java_package = "se.scalablesolutions.akka.remote.protocol";
|
||||
option optimize_for = SPEED;
|
||||
|
||||
/******************************************
|
||||
Compile with:
|
||||
cd ./akka-core/src/main/protocol
|
||||
protoc RemoteProtocol.proto --java_out ../java
|
||||
*******************************************/
|
||||
|
||||
/**
|
||||
* Defines the serialization scheme used to serialize the message and/or Actor instance.
|
||||
*/
|
||||
enum SerializationSchemeProtocol {
|
||||
JAVA = 1;
|
||||
SBINARY = 2;
|
||||
SCALA_JSON = 3;
|
||||
JAVA_JSON = 4;
|
||||
PROTOBUF = 5;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a the life-cycle of a supervised Actor.
|
||||
*/
|
||||
enum LifeCycleProtocol {
|
||||
PERMANENT = 1;
|
||||
TEMPORARY = 2;
|
||||
}
|
||||
|
||||
/*
|
||||
enum DispatcherProtocol {
|
||||
GLOBAL_EVENT_EXECUTOR_BASED = 1;
|
||||
GLOBAL_REACTOR_SINGLE_THREAD_BASED = 2;
|
||||
GLOBAL_REACTOR_THREAD_POOL_BASED = 3;
|
||||
EVENT_EXECUTOR_BASED = 4;
|
||||
THREAD_BASED = 5;
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* Defines a remote address.
|
||||
*/
|
||||
message AddressProtocol {
|
||||
required string hostname = 1;
|
||||
required uint32 port = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines an exception.
|
||||
*/
|
||||
message ExceptionProtocol {
|
||||
required string classname = 1;
|
||||
required string message = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote ActorRef that "remembers" and uses its original Actor instance.
|
||||
*/
|
||||
message RemoteActorRefProtocol {
|
||||
required string uuid = 1;
|
||||
required string actorClassName = 2;
|
||||
required AddressProtocol homeAddress = 3;
|
||||
optional uint64 timeout = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a fully serialized remote ActorRef that is about to be instantiated on
|
||||
* the remote node. It is fully disconnected from its original host.
|
||||
*/
|
||||
message SerializedActorRefProtocol {
|
||||
required string uuid = 1;
|
||||
required string id = 2;
|
||||
required string actorClassName = 3;
|
||||
required bytes actorInstance = 4;
|
||||
required SerializationSchemeProtocol serializationScheme = 5;
|
||||
required AddressProtocol originalAddress = 6;
|
||||
optional bool isTransactor = 7;
|
||||
optional uint64 timeout = 8;
|
||||
optional LifeCycleProtocol lifeCycle = 9;
|
||||
optional RemoteActorRefProtocol supervisor = 10;
|
||||
optional bytes hotswapStack = 11;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote message request.
|
||||
*/
|
||||
message RemoteRequestProtocol {
|
||||
required uint64 id = 1;
|
||||
required SerializationSchemeProtocol serializationScheme = 2;
|
||||
required bytes message = 3;
|
||||
optional bytes messageManifest = 4;
|
||||
optional string method = 5;
|
||||
required string target = 6;
|
||||
required string uuid = 7;
|
||||
required uint64 timeout = 8;
|
||||
optional string supervisorUuid = 9;
|
||||
required bool isActor = 10;
|
||||
required bool isOneWay = 11;
|
||||
required bool isEscaped = 12;
|
||||
optional RemoteActorRefProtocol sender = 13;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote message reply.
|
||||
*/
|
||||
message RemoteReplyProtocol {
|
||||
required uint64 id = 1;
|
||||
optional SerializationSchemeProtocol serializationScheme = 2;
|
||||
optional bytes message = 3;
|
||||
optional bytes messageManifest = 4;
|
||||
optional ExceptionProtocol exception = 5;
|
||||
optional string supervisorUuid = 6;
|
||||
required bool isActor = 7;
|
||||
required bool isSuccessful = 8;
|
||||
}
|
||||
|
|
@ -5,7 +5,7 @@
|
|||
package se.scalablesolutions.akka.actor
|
||||
|
||||
import se.scalablesolutions.akka.config.FaultHandlingStrategy
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol
|
||||
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestProtocolIdFactory}
|
||||
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future, CompletableFuture}
|
||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._
|
|||
import se.scalablesolutions.akka.stm.Transaction.Global._
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement._
|
||||
import se.scalablesolutions.akka.stm.TransactionManagement
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol, ActorRefProtocol}
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
||||
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory}
|
||||
import se.scalablesolutions.akka.serialization.Serializer
|
||||
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID, ReentrantGuard}
|
||||
|
|
@ -51,20 +51,20 @@ object ActorRef {
|
|||
* Deserializes the ActorRef instance from a byte array (Array[Byte]) into an ActorRef instance.
|
||||
*/
|
||||
def fromBinary(bytes: Array[Byte]): ActorRef =
|
||||
fromProtobuf(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
|
||||
fromProtobuf(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
|
||||
|
||||
def fromBinary(bytes: Array[Byte], loader: ClassLoader): ActorRef =
|
||||
fromProtobuf(ActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
|
||||
fromProtobuf(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
|
||||
|
||||
/**
|
||||
* Deserializes the ActorRef instance from a Protocol Buffers (protobuf) Message into an ActorRef instance.
|
||||
*/
|
||||
private[akka] def fromProtobuf(protocol: ActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
|
||||
private[akka] def fromProtobuf(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
|
||||
RemoteActorRef(
|
||||
protocol.getUuid,
|
||||
protocol.getActorClassName,
|
||||
protocol.getSourceHostname,
|
||||
protocol.getSourcePort,
|
||||
protocol.getHomeAddress.getHostname,
|
||||
protocol.getHomeAddress.getPort,
|
||||
protocol.getTimeout,
|
||||
loader)
|
||||
}
|
||||
|
|
@ -517,7 +517,7 @@ trait ActorRef extends TransactionManagement {
|
|||
*/
|
||||
def shutdownLinkedActors: Unit
|
||||
|
||||
protected[akka] def toProtobuf: ActorRefProtocol
|
||||
protected[akka] def toProtobuf: RemoteActorRefProtocol
|
||||
|
||||
protected[akka] def invoke(messageHandle: MessageInvocation): Unit
|
||||
|
||||
|
|
@ -599,7 +599,7 @@ sealed class LocalActorRef private[akka](
|
|||
/**
|
||||
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
|
||||
*/
|
||||
protected[akka] def toProtobuf: ActorRefProtocol = guard.withGuard {
|
||||
protected[akka] def toProtobuf: RemoteActorRefProtocol = guard.withGuard {
|
||||
val host = homeAddress.getHostName
|
||||
val port = homeAddress.getPort
|
||||
|
||||
|
|
@ -609,12 +609,10 @@ sealed class LocalActorRef private[akka](
|
|||
RemoteServer.registerActor(homeAddress, uuid, this)
|
||||
registeredInRemoteNodeDuringSerialization = true
|
||||
}
|
||||
|
||||
ActorRefProtocol.newBuilder
|
||||
RemoteActorRefProtocol.newBuilder
|
||||
.setUuid(uuid)
|
||||
.setActorClassName(actorClass.getName)
|
||||
.setSourceHostname(host)
|
||||
.setSourcePort(port)
|
||||
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
|
||||
.setTimeout(timeout)
|
||||
.build
|
||||
}
|
||||
|
|
@ -1279,7 +1277,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
def mailboxSize: Int = unsupported
|
||||
def supervisor: Option[ActorRef] = unsupported
|
||||
def shutdownLinkedActors: Unit = unsupported
|
||||
protected[akka] def toProtobuf: ActorRefProtocol = unsupported
|
||||
protected[akka] def toProtobuf: RemoteActorRefProtocol = unsupported
|
||||
protected[akka] def mailbox: Deque[MessageInvocation] = unsupported
|
||||
protected[akka] def restart(reason: Throwable): Unit = unsupported
|
||||
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package se.scalablesolutions.akka.remote
|
||||
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol}
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
||||
import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef}
|
||||
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
|
||||
import se.scalablesolutions.akka.util.{UUID, Logging}
|
||||
|
|
@ -359,12 +359,11 @@ class RemoteClientHandler(val name: String,
|
|||
event.getChannel.close
|
||||
}
|
||||
|
||||
private def parseException(reply: RemoteReplyProtocol) = {
|
||||
private def parseException(reply: RemoteReplyProtocol): Throwable = {
|
||||
val exception = reply.getException
|
||||
val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$')))
|
||||
val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length)
|
||||
exceptionType
|
||||
val exceptionClass = Class.forName(exception.getClassname)
|
||||
exceptionClass
|
||||
.getConstructor(Array[Class[_]](classOf[String]): _*)
|
||||
.newInstance(exceptionMessage).asInstanceOf[Throwable]
|
||||
.newInstance(exception.getMessage).asInstanceOf[Throwable]
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,18 +4,17 @@
|
|||
|
||||
package se.scalablesolutions.akka.remote
|
||||
|
||||
import se.scalablesolutions.akka.serialization.Serializable.SBinary
|
||||
import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol}
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol}
|
||||
import se.scalablesolutions.akka.serialization.{Serializer, Serializable}
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
||||
|
||||
import com.google.protobuf.{Message, ByteString}
|
||||
|
||||
object RemoteProtocolBuilder {
|
||||
private var SERIALIZER_JAVA: Serializer.Java = Serializer.Java
|
||||
private var SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON
|
||||
private var SERIALIZER_JAVA: Serializer.Java = Serializer.Java
|
||||
private var SERIALIZER_JAVA_JSON: Serializer.JavaJSON = Serializer.JavaJSON
|
||||
private var SERIALIZER_SCALA_JSON: Serializer.ScalaJSON = Serializer.ScalaJSON
|
||||
private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
|
||||
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
|
||||
private var SERIALIZER_SBINARY: Serializer.SBinary = Serializer.SBinary
|
||||
private var SERIALIZER_PROTOBUF: Serializer.Protobuf = Serializer.Protobuf
|
||||
|
||||
def setClassLoader(cl: ClassLoader) = {
|
||||
SERIALIZER_JAVA.classLoader = Some(cl)
|
||||
|
|
@ -25,44 +24,44 @@ object RemoteProtocolBuilder {
|
|||
}
|
||||
|
||||
def getMessage(request: RemoteRequestProtocol): Any = {
|
||||
request.getProtocol match {
|
||||
case SerializationProtocol.JAVA =>
|
||||
request.getSerializationScheme match {
|
||||
case SerializationSchemeProtocol.JAVA =>
|
||||
unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None))
|
||||
case SerializationProtocol.SBINARY =>
|
||||
case SerializationSchemeProtocol.SBINARY =>
|
||||
val classToLoad = new String(request.getMessageManifest.toByteArray)
|
||||
val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad)
|
||||
else Class.forName(classToLoad)
|
||||
val renderer = clazz.newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
|
||||
val renderer = clazz.newInstance.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
|
||||
renderer.fromBytes(request.getMessage.toByteArray)
|
||||
case SerializationProtocol.SCALA_JSON =>
|
||||
case SerializationSchemeProtocol.SCALA_JSON =>
|
||||
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
|
||||
SERIALIZER_SCALA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
|
||||
case SerializationProtocol.JAVA_JSON =>
|
||||
case SerializationSchemeProtocol.JAVA_JSON =>
|
||||
val manifest = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[String]
|
||||
SERIALIZER_JAVA_JSON.in(request.getMessage.toByteArray, Some(Class.forName(manifest)))
|
||||
case SerializationProtocol.PROTOBUF =>
|
||||
case SerializationSchemeProtocol.PROTOBUF =>
|
||||
val messageClass = SERIALIZER_JAVA.in(request.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
|
||||
SERIALIZER_PROTOBUF.in(request.getMessage.toByteArray, Some(messageClass))
|
||||
}
|
||||
}
|
||||
|
||||
def getMessage(reply: RemoteReplyProtocol): Any = {
|
||||
reply.getProtocol match {
|
||||
case SerializationProtocol.JAVA =>
|
||||
reply.getSerializationScheme match {
|
||||
case SerializationSchemeProtocol.JAVA =>
|
||||
unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None))
|
||||
case SerializationProtocol.SBINARY =>
|
||||
case SerializationSchemeProtocol.SBINARY =>
|
||||
val classToLoad = new String(reply.getMessageManifest.toByteArray)
|
||||
val clazz = if (SERIALIZER_SBINARY.classLoader.isDefined) SERIALIZER_SBINARY.classLoader.get.loadClass(classToLoad)
|
||||
else Class.forName(classToLoad)
|
||||
val renderer = clazz.newInstance.asInstanceOf[SBinary[_ <: AnyRef]]
|
||||
val renderer = clazz.newInstance.asInstanceOf[Serializable.SBinary[_ <: AnyRef]]
|
||||
renderer.fromBytes(reply.getMessage.toByteArray)
|
||||
case SerializationProtocol.SCALA_JSON =>
|
||||
case SerializationSchemeProtocol.SCALA_JSON =>
|
||||
val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
|
||||
SERIALIZER_SCALA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
|
||||
case SerializationProtocol.JAVA_JSON =>
|
||||
case SerializationSchemeProtocol.JAVA_JSON =>
|
||||
val manifest = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[String]
|
||||
SERIALIZER_JAVA_JSON.in(reply.getMessage.toByteArray, Some(Class.forName(manifest)))
|
||||
case SerializationProtocol.PROTOBUF =>
|
||||
case SerializationSchemeProtocol.PROTOBUF =>
|
||||
val messageClass = SERIALIZER_JAVA.in(reply.getMessageManifest.toByteArray, None).asInstanceOf[Class[_]]
|
||||
SERIALIZER_PROTOBUF.in(reply.getMessage.toByteArray, Some(messageClass))
|
||||
}
|
||||
|
|
@ -71,27 +70,27 @@ object RemoteProtocolBuilder {
|
|||
def setMessage(message: Any, builder: RemoteRequestProtocol.Builder) = {
|
||||
if (message.isInstanceOf[Serializable.SBinary[_]]) {
|
||||
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
|
||||
builder.setProtocol(SerializationProtocol.SBINARY)
|
||||
builder.setSerializationScheme(SerializationSchemeProtocol.SBINARY)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
} else if (message.isInstanceOf[Message]) {
|
||||
val serializable = message.asInstanceOf[Message]
|
||||
builder.setProtocol(SerializationProtocol.PROTOBUF)
|
||||
builder.setSerializationScheme(SerializationSchemeProtocol.PROTOBUF)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
|
||||
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass)))
|
||||
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
|
||||
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
|
||||
builder.setProtocol(SerializationProtocol.SCALA_JSON)
|
||||
builder.setSerializationScheme(SerializationSchemeProtocol.SCALA_JSON)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
|
||||
val serializable = message.asInstanceOf[Serializable.JavaJSON]
|
||||
builder.setProtocol(SerializationProtocol.JAVA_JSON)
|
||||
builder.setSerializationScheme(SerializationSchemeProtocol.JAVA_JSON)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
} else {
|
||||
// default, e.g. if no protocol used explicitly then use Java serialization
|
||||
builder.setProtocol(SerializationProtocol.JAVA)
|
||||
builder.setSerializationScheme(SerializationSchemeProtocol.JAVA)
|
||||
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.out(box(message))))
|
||||
}
|
||||
}
|
||||
|
|
@ -99,27 +98,27 @@ object RemoteProtocolBuilder {
|
|||
def setMessage(message: Any, builder: RemoteReplyProtocol.Builder) = {
|
||||
if (message.isInstanceOf[Serializable.SBinary[_]]) {
|
||||
val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]]
|
||||
builder.setProtocol(SerializationProtocol.SBINARY)
|
||||
builder.setSerializationScheme(SerializationSchemeProtocol.SBINARY)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
} else if (message.isInstanceOf[Message]) {
|
||||
val serializable = message.asInstanceOf[Message]
|
||||
builder.setProtocol(SerializationProtocol.PROTOBUF)
|
||||
builder.setSerializationScheme(SerializationSchemeProtocol.PROTOBUF)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toByteArray))
|
||||
builder.setMessageManifest(ByteString.copyFrom(SERIALIZER_JAVA.out(serializable.getClass)))
|
||||
} else if (message.isInstanceOf[Serializable.ScalaJSON]) {
|
||||
val serializable = message.asInstanceOf[Serializable.ScalaJSON]
|
||||
builder.setProtocol(SerializationProtocol.SCALA_JSON)
|
||||
builder.setSerializationScheme(SerializationSchemeProtocol.SCALA_JSON)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
} else if (message.isInstanceOf[Serializable.JavaJSON]) {
|
||||
val serializable = message.asInstanceOf[Serializable.JavaJSON]
|
||||
builder.setProtocol(SerializationProtocol.JAVA_JSON)
|
||||
builder.setSerializationScheme(SerializationSchemeProtocol.JAVA_JSON)
|
||||
builder.setMessage(ByteString.copyFrom(serializable.toBytes))
|
||||
builder.setMessageManifest(ByteString.copyFrom(serializable.getClass.getName.getBytes))
|
||||
} else {
|
||||
// default, e.g. if no protocol used explicitly then use Java serialization
|
||||
builder.setProtocol(SerializationProtocol.JAVA)
|
||||
builder.setSerializationScheme(SerializationSchemeProtocol.JAVA)
|
||||
builder.setMessage(ByteString.copyFrom(SERIALIZER_JAVA.out(box(message))))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import java.util.{Map => JMap}
|
|||
|
||||
import se.scalablesolutions.akka.actor._
|
||||
import se.scalablesolutions.akka.util._
|
||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol._
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
||||
import se.scalablesolutions.akka.config.Config.config
|
||||
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap
|
||||
|
|
@ -385,7 +385,7 @@ class RemoteServerHandler(
|
|||
log.error(e, "Could not invoke remote actor [%s]", request.getTarget)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(e.getClass.getName + "$" + e.getMessage)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(true)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
|
|
@ -425,7 +425,7 @@ class RemoteServerHandler(
|
|||
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(false)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
|
|
@ -435,7 +435,7 @@ class RemoteServerHandler(
|
|||
log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget)
|
||||
val replyBuilder = RemoteReplyProtocol.newBuilder
|
||||
.setId(request.getId)
|
||||
.setException(e.getClass.getName + "$" + e.getMessage)
|
||||
.setException(ExceptionProtocol.newBuilder.setClassname(e.getClass.getName).setMessage(e.getMessage).build)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(false)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
|
|
|
|||
|
|
@ -15,14 +15,6 @@ import java.io.{StringWriter, ByteArrayOutputStream, ObjectOutputStream}
|
|||
|
||||
import sjson.json.{Serializer=>SJSONSerializer}
|
||||
|
||||
object SerializationProtocol {
|
||||
val JAVA = 0
|
||||
val SBINARY = 1
|
||||
val SCALA_JSON = 2
|
||||
val JAVA_JSON = 3
|
||||
val PROTOBUF = 4
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue