Added fromProtobufToLocalActorRef serialization, all old test passing

This commit is contained in:
Jonas Bonér 2010-06-15 13:15:00 +02:00
parent 068a6d7e12
commit ac5b0882ea
4 changed files with 201 additions and 88 deletions

View file

@ -11,6 +11,69 @@ option optimize_for = SPEED;
protoc RemoteProtocol.proto --java_out ../java
*******************************************/
/**
* Defines a remote ActorRef that "remembers" and uses its original Actor instance
* on the original node.
*/
message RemoteActorRefProtocol {
required string uuid = 1;
required string actorClassname = 2;
required AddressProtocol homeAddress = 3;
optional uint64 timeout = 4;
}
/**
* Defines a fully serialized remote ActorRef (with serialized Actor instance)
* that is about to be instantiated on the remote node. It is fully disconnected
* from its original host.
*/
message SerializedActorRefProtocol {
required string uuid = 1;
required string id = 2;
required string actorClassname = 3;
required bytes actorInstance = 4;
required string serializerClassname = 5;
required AddressProtocol originalAddress = 6;
optional bool isTransactor = 7;
optional uint64 timeout = 8;
optional LifeCycleProtocol lifeCycle = 9;
optional RemoteActorRefProtocol supervisor = 10;
optional bytes hotswapStack = 11;
}
/**
* Defines a remote message request.
*/
message RemoteRequestProtocol {
required uint64 id = 1;
required SerializationSchemeType serializationScheme = 2;
required bytes message = 3;
optional bytes messageManifest = 4;
optional string method = 5;
required string target = 6;
required string uuid = 7;
required uint64 timeout = 8;
optional string supervisorUuid = 9;
required bool isActor = 10;
required bool isOneWay = 11;
required bool isEscaped = 12;
optional RemoteActorRefProtocol sender = 13;
}
/**
* Defines a remote message reply.
*/
message RemoteReplyProtocol {
required uint64 id = 1;
optional SerializationSchemeType serializationScheme = 2;
optional bytes message = 3;
optional bytes messageManifest = 4;
optional ExceptionProtocol exception = 5;
optional string supervisorUuid = 6;
required bool isActor = 7;
required bool isSuccessful = 8;
}
/**
* Defines the serialization scheme used to serialize the message and/or Actor instance.
*/
@ -64,64 +127,3 @@ message ExceptionProtocol {
required string classname = 1;
required string message = 2;
}
/**
* Defines a remote ActorRef that "remembers" and uses its original Actor instance.
*/
message RemoteActorRefProtocol {
required string uuid = 1;
required string actorClassname = 2;
required AddressProtocol homeAddress = 3;
optional uint64 timeout = 4;
}
/**
* Defines a fully serialized remote ActorRef that is about to be instantiated on
* the remote node. It is fully disconnected from its original host.
*/
message SerializedActorRefProtocol {
required string uuid = 1;
required string id = 2;
required string actorClassname = 3;
required bytes actorInstance = 4;
required string serializerClassname = 5;
required AddressProtocol originalAddress = 6;
optional bool isTransactor = 7;
optional uint64 timeout = 8;
optional LifeCycleProtocol lifeCycle = 9;
optional RemoteActorRefProtocol supervisor = 10;
optional bytes hotswapStack = 11;
}
/**
* Defines a remote message request.
*/
message RemoteRequestProtocol {
required uint64 id = 1;
required SerializationSchemeType serializationScheme = 2;
required bytes message = 3;
optional bytes messageManifest = 4;
optional string method = 5;
required string target = 6;
required string uuid = 7;
required uint64 timeout = 8;
optional string supervisorUuid = 9;
required bool isActor = 10;
required bool isOneWay = 11;
required bool isEscaped = 12;
optional RemoteActorRefProtocol sender = 13;
}
/**
* Defines a remote message reply.
*/
message RemoteReplyProtocol {
required uint64 id = 1;
optional SerializationSchemeType serializationScheme = 2;
optional bytes message = 3;
optional bytes messageManifest = 4;
optional ExceptionProtocol exception = 5;
optional string supervisorUuid = 6;
required bool isActor = 7;
required bool isSuccessful = 8;
}

View file

@ -33,35 +33,48 @@ import com.google.protobuf.ByteString
/**
* The ActorRef object can be used to deserialize ActorRef instances from of its binary representation
* or its Protocol Buffers (protobuf) Message representation to a Actor.actorOf instance.
*
* <p/>
* Binary -> ActorRef:
* <pre>
* val actorRef = ActorRef.fromBinary(bytes)
* actorRef ! message // send message to remote actor through its reference
* </pre>
*
* <p/>
* Protobuf Message -> ActorRef:
* Protobuf Message -> RemoteActorRef:
* <pre>
* val actorRef = ActorRef.fromProtobuf(protobufMessage)
* val actorRef = ActorRef.fromBinaryToRemoteActorRef(protobufMessage)
* actorRef ! message // send message to remote actor through its reference
* </pre>
*
* <p/>
* Protobuf Message -> LocalActorRef:
* <pre>
* val actorRef = ActorRef.fromBinaryToLocalActorRef(protobufMessage)
* actorRef ! message // send message to local actor through its reference
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRef {
/**
* Deserializes the ActorRef instance from a byte array (Array[Byte]) into an ActorRef instance.
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
def fromBinary(bytes: Array[Byte]): ActorRef =
fromProtobuf(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
def fromBinary(bytes: Array[Byte], loader: ClassLoader): ActorRef =
fromProtobuf(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
/**
* Deserializes the ActorRef instance from a Protocol Buffers (protobuf) Message into an ActorRef instance.
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
*/
private[akka] def fromProtobuf(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
RemoteActorRef(
protocol.getUuid,
protocol.getActorClassname,
@ -69,6 +82,65 @@ object ActorRef {
protocol.getHomeAddress.getPort,
protocol.getTimeout,
loader)
/**
* Deserializes a byte array (Array[Byte]) into an LocalActorRef instance.
*/
def fromBinaryToLocalActorRef(bytes: Array[Byte]): ActorRef =
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
/**
* Deserializes a byte array (Array[Byte]) into an LocalActorRef instance.
*/
def fromBinaryToLocalActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
/**
* Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance.
*/
private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
val serializerClass =
if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname)
else Class.forName(protocol.getSerializerClassname)
val serializer = serializerClass.newInstance.asInstanceOf[Serializer]
val lifeCycle =
if (protocol.hasLifeCycle) {
val lifeCycleProtocol = protocol.getLifeCycle
val restartCallbacks =
if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart)
Some(RestartCallbacks(lifeCycleProtocol.getPreRestart, lifeCycleProtocol.getPostRestart))
else None
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks)
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary, restartCallbacks)
else throw new IllegalStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
} else None
val supervisor =
if (protocol.hasSupervisor)
Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
else None
val hotswap =
if (protocol.hasHotswapStack) Some(serializer
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
.asInstanceOf[PartialFunction[Any, Unit]])
else None
new LocalActorRef(
protocol.getUuid,
protocol.getId,
protocol.getActorClassname,
protocol.getActorInstance.toByteArray,
protocol.getOriginalAddress.getHostname,
protocol.getOriginalAddress.getPort,
if (protocol.hasIsTransactor) protocol.getIsTransactor else false,
if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT,
lifeCycle,
supervisor,
hotswap,
loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader?
serializer)
}
}
/**
@ -138,19 +210,25 @@ trait ActorRef extends TransactionManagement {
/**
* User overridable callback/setting.
*
* <p/>
* Set trapExit to the list of exception classes that the actor should be able to trap
* from the actor it is supervising. When the supervising actor throws these exceptions
* then they will trigger a restart.
* <p/>
*
* Trap no exceptions:
* <pre>
* // trap no exceptions
* trapExit = Nil
* </pre>
*
* // trap all exceptions
* Trap all exceptions:
* <pre>
* trapExit = List(classOf[Throwable])
* </pre>
*
* // trap specific exceptions only
* Trap specific exceptions only:
* <pre>
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
* </pre>
*/
@ -160,10 +238,13 @@ trait ActorRef extends TransactionManagement {
* User overridable callback/setting.
* <p/>
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
* <p/>
* Can be one of:
* <pre/>
* <pre>
* faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
*
* </pre>
* Or:
* <pre>
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
* </pre>
*/
@ -228,7 +309,7 @@ trait ActorRef extends TransactionManagement {
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture }
def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture }
/**
* Is the actor being restarted?
@ -586,16 +667,47 @@ trait ActorRef extends TransactionManagement {
}
/**
* Local ActorRef that is used when referencing the Actor on its "home" node.
* Local (serializable) ActorRef that is used when referencing the Actor on its "home" node.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed class LocalActorRef private[akka](
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None))
extends ActorRef {
private var isDeserialized = false
private var loader: Option[ClassLoader] = None
private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz)))
private[akka] def this(factory: () => Actor) = this(Right(Some(factory)))
// used only for deserialization
private[akka] def this(__uuid: String,
__id: String,
__actorClassName: String,
__actorBytes: Array[Byte],
__hostname: String,
__port: Int,
__isTransactor: Boolean,
__timeout: Long,
__lifeCycle: Option[LifeCycle],
__supervisor: Option[ActorRef],
__hotswap: Option[PartialFunction[Any, Unit]],
__loader: ClassLoader,
__serializer: Serializer) = {
this(() => __serializer.fromBinary(__actorBytes, Some(__loader.loadClass(__actorClassName))).asInstanceOf[Actor])
loader = Some(__loader)
isDeserialized = true
_uuid = __uuid
id = __id
homeAddress = (__hostname, __port)
isTransactor = __isTransactor
timeout = __timeout
lifeCycle = __lifeCycle
_supervisor = __supervisor
hotswap = __hotswap
ActorRegistry.register(this)
}
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None
@ -612,7 +724,7 @@ sealed class LocalActorRef private[akka](
// instance elegible for garbage collection
private val actorSelfFields = findActorSelfField(actor.getClass)
if (runActorInitialization) initializeActorInstance
if (runActorInitialization && !isDeserialized) initializeActorInstance
/**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
@ -637,7 +749,8 @@ sealed class LocalActorRef private[akka](
}
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard {
if (!isSerializable) throw new IllegalStateException("Can't serialize an ActorRef using SerializedActorRefProtocol that is wrapping an Actor that is not mixing in the SerializableActor trait")
if (!isSerializable) throw new IllegalStateException(
"Can't serialize an ActorRef using SerializedActorRefProtocol\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
@ -654,8 +767,7 @@ sealed class LocalActorRef private[akka](
builder.setPreRestart(callbacks.preRestart)
builder.setPostRestart(callbacks.postRestart)
Some(builder.build)
case None =>
None
case None => None
}
}

View file

@ -323,9 +323,8 @@ class RemoteServerHandler(
applicationLoader.foreach(RemoteProtocolBuilder.setClassLoader(_))
/**
* ChannelOpen overridden to store open channels for a clean shutdown
* of a RemoteServer. If a channel is closed before, it is
* automatically removed from the open channels group.
* ChannelOpen overridden to store open channels for a clean shutdown of a RemoteServer.
* If a channel is closed before, it is automatically removed from the open channels group.
*/
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) {
openChannels.add(ctx.getChannel)
@ -364,8 +363,9 @@ class RemoteServerHandler(
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorRef.start
val message = RemoteProtocolBuilder.getMessage(request)
val sender = if (request.hasSender) Some(ActorRef.fromProtobuf(request.getSender, applicationLoader))
else None
val sender =
if (request.hasSender) Some(ActorRef.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
else None
if (request.getIsOneWay) actorRef.!(message)(sender)
else {
try {
@ -403,7 +403,6 @@ class RemoteServerHandler(
val argClasses = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout)
//continueTransaction(request)
try {
val messageReceiver = activeObject.getClass.getDeclaredMethod(
request.getMethod, unescapedArgClasses: _*)

View file

@ -103,7 +103,7 @@ class StmSpec extends
}
describe("Transactor") {
it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multipse times in a row") {
it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multiple times in a row") {
import GlobalTransactionVectorTestActor._
try {
val actor = actorOf[NestedTransactorLevelOneActor].start