Added fromProtobufToLocalActorRef serialization, all old test passing
This commit is contained in:
parent
48750e9dce
commit
c9b72284c1
4 changed files with 201 additions and 88 deletions
|
|
@ -11,6 +11,69 @@ option optimize_for = SPEED;
|
|||
protoc RemoteProtocol.proto --java_out ../java
|
||||
*******************************************/
|
||||
|
||||
/**
|
||||
* Defines a remote ActorRef that "remembers" and uses its original Actor instance
|
||||
* on the original node.
|
||||
*/
|
||||
message RemoteActorRefProtocol {
|
||||
required string uuid = 1;
|
||||
required string actorClassname = 2;
|
||||
required AddressProtocol homeAddress = 3;
|
||||
optional uint64 timeout = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a fully serialized remote ActorRef (with serialized Actor instance)
|
||||
* that is about to be instantiated on the remote node. It is fully disconnected
|
||||
* from its original host.
|
||||
*/
|
||||
message SerializedActorRefProtocol {
|
||||
required string uuid = 1;
|
||||
required string id = 2;
|
||||
required string actorClassname = 3;
|
||||
required bytes actorInstance = 4;
|
||||
required string serializerClassname = 5;
|
||||
required AddressProtocol originalAddress = 6;
|
||||
optional bool isTransactor = 7;
|
||||
optional uint64 timeout = 8;
|
||||
optional LifeCycleProtocol lifeCycle = 9;
|
||||
optional RemoteActorRefProtocol supervisor = 10;
|
||||
optional bytes hotswapStack = 11;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote message request.
|
||||
*/
|
||||
message RemoteRequestProtocol {
|
||||
required uint64 id = 1;
|
||||
required SerializationSchemeType serializationScheme = 2;
|
||||
required bytes message = 3;
|
||||
optional bytes messageManifest = 4;
|
||||
optional string method = 5;
|
||||
required string target = 6;
|
||||
required string uuid = 7;
|
||||
required uint64 timeout = 8;
|
||||
optional string supervisorUuid = 9;
|
||||
required bool isActor = 10;
|
||||
required bool isOneWay = 11;
|
||||
required bool isEscaped = 12;
|
||||
optional RemoteActorRefProtocol sender = 13;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote message reply.
|
||||
*/
|
||||
message RemoteReplyProtocol {
|
||||
required uint64 id = 1;
|
||||
optional SerializationSchemeType serializationScheme = 2;
|
||||
optional bytes message = 3;
|
||||
optional bytes messageManifest = 4;
|
||||
optional ExceptionProtocol exception = 5;
|
||||
optional string supervisorUuid = 6;
|
||||
required bool isActor = 7;
|
||||
required bool isSuccessful = 8;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the serialization scheme used to serialize the message and/or Actor instance.
|
||||
*/
|
||||
|
|
@ -64,64 +127,3 @@ message ExceptionProtocol {
|
|||
required string classname = 1;
|
||||
required string message = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote ActorRef that "remembers" and uses its original Actor instance.
|
||||
*/
|
||||
message RemoteActorRefProtocol {
|
||||
required string uuid = 1;
|
||||
required string actorClassname = 2;
|
||||
required AddressProtocol homeAddress = 3;
|
||||
optional uint64 timeout = 4;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a fully serialized remote ActorRef that is about to be instantiated on
|
||||
* the remote node. It is fully disconnected from its original host.
|
||||
*/
|
||||
message SerializedActorRefProtocol {
|
||||
required string uuid = 1;
|
||||
required string id = 2;
|
||||
required string actorClassname = 3;
|
||||
required bytes actorInstance = 4;
|
||||
required string serializerClassname = 5;
|
||||
required AddressProtocol originalAddress = 6;
|
||||
optional bool isTransactor = 7;
|
||||
optional uint64 timeout = 8;
|
||||
optional LifeCycleProtocol lifeCycle = 9;
|
||||
optional RemoteActorRefProtocol supervisor = 10;
|
||||
optional bytes hotswapStack = 11;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote message request.
|
||||
*/
|
||||
message RemoteRequestProtocol {
|
||||
required uint64 id = 1;
|
||||
required SerializationSchemeType serializationScheme = 2;
|
||||
required bytes message = 3;
|
||||
optional bytes messageManifest = 4;
|
||||
optional string method = 5;
|
||||
required string target = 6;
|
||||
required string uuid = 7;
|
||||
required uint64 timeout = 8;
|
||||
optional string supervisorUuid = 9;
|
||||
required bool isActor = 10;
|
||||
required bool isOneWay = 11;
|
||||
required bool isEscaped = 12;
|
||||
optional RemoteActorRefProtocol sender = 13;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote message reply.
|
||||
*/
|
||||
message RemoteReplyProtocol {
|
||||
required uint64 id = 1;
|
||||
optional SerializationSchemeType serializationScheme = 2;
|
||||
optional bytes message = 3;
|
||||
optional bytes messageManifest = 4;
|
||||
optional ExceptionProtocol exception = 5;
|
||||
optional string supervisorUuid = 6;
|
||||
required bool isActor = 7;
|
||||
required bool isSuccessful = 8;
|
||||
}
|
||||
|
|
@ -33,35 +33,48 @@ import com.google.protobuf.ByteString
|
|||
/**
|
||||
* The ActorRef object can be used to deserialize ActorRef instances from of its binary representation
|
||||
* or its Protocol Buffers (protobuf) Message representation to a Actor.actorOf instance.
|
||||
*
|
||||
* <p/>
|
||||
* Binary -> ActorRef:
|
||||
* <pre>
|
||||
* val actorRef = ActorRef.fromBinary(bytes)
|
||||
* actorRef ! message // send message to remote actor through its reference
|
||||
* </pre>
|
||||
*
|
||||
* <p/>
|
||||
* Protobuf Message -> ActorRef:
|
||||
* Protobuf Message -> RemoteActorRef:
|
||||
* <pre>
|
||||
* val actorRef = ActorRef.fromProtobuf(protobufMessage)
|
||||
* val actorRef = ActorRef.fromBinaryToRemoteActorRef(protobufMessage)
|
||||
* actorRef ! message // send message to remote actor through its reference
|
||||
* </pre>
|
||||
*
|
||||
* <p/>
|
||||
* Protobuf Message -> LocalActorRef:
|
||||
* <pre>
|
||||
* val actorRef = ActorRef.fromBinaryToLocalActorRef(protobufMessage)
|
||||
* actorRef ! message // send message to local actor through its reference
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object ActorRef {
|
||||
|
||||
/**
|
||||
* Deserializes the ActorRef instance from a byte array (Array[Byte]) into an ActorRef instance.
|
||||
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
|
||||
*/
|
||||
def fromBinary(bytes: Array[Byte]): ActorRef =
|
||||
fromProtobuf(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
|
||||
def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
|
||||
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
|
||||
|
||||
def fromBinary(bytes: Array[Byte], loader: ClassLoader): ActorRef =
|
||||
fromProtobuf(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
|
||||
/**
|
||||
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
|
||||
*/
|
||||
def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
|
||||
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
|
||||
|
||||
/**
|
||||
* Deserializes the ActorRef instance from a Protocol Buffers (protobuf) Message into an ActorRef instance.
|
||||
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
|
||||
*/
|
||||
private[akka] def fromProtobuf(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
|
||||
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef =
|
||||
RemoteActorRef(
|
||||
protocol.getUuid,
|
||||
protocol.getActorClassname,
|
||||
|
|
@ -69,6 +82,65 @@ object ActorRef {
|
|||
protocol.getHomeAddress.getPort,
|
||||
protocol.getTimeout,
|
||||
loader)
|
||||
|
||||
/**
|
||||
* Deserializes a byte array (Array[Byte]) into an LocalActorRef instance.
|
||||
*/
|
||||
def fromBinaryToLocalActorRef(bytes: Array[Byte]): ActorRef =
|
||||
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
|
||||
|
||||
/**
|
||||
* Deserializes a byte array (Array[Byte]) into an LocalActorRef instance.
|
||||
*/
|
||||
def fromBinaryToLocalActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
|
||||
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
|
||||
|
||||
/**
|
||||
* Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance.
|
||||
*/
|
||||
private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
|
||||
val serializerClass =
|
||||
if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname)
|
||||
else Class.forName(protocol.getSerializerClassname)
|
||||
val serializer = serializerClass.newInstance.asInstanceOf[Serializer]
|
||||
|
||||
val lifeCycle =
|
||||
if (protocol.hasLifeCycle) {
|
||||
val lifeCycleProtocol = protocol.getLifeCycle
|
||||
val restartCallbacks =
|
||||
if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart)
|
||||
Some(RestartCallbacks(lifeCycleProtocol.getPreRestart, lifeCycleProtocol.getPostRestart))
|
||||
else None
|
||||
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks)
|
||||
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary, restartCallbacks)
|
||||
else throw new IllegalStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
|
||||
} else None
|
||||
|
||||
val supervisor =
|
||||
if (protocol.hasSupervisor)
|
||||
Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
|
||||
else None
|
||||
val hotswap =
|
||||
if (protocol.hasHotswapStack) Some(serializer
|
||||
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
|
||||
.asInstanceOf[PartialFunction[Any, Unit]])
|
||||
else None
|
||||
|
||||
new LocalActorRef(
|
||||
protocol.getUuid,
|
||||
protocol.getId,
|
||||
protocol.getActorClassname,
|
||||
protocol.getActorInstance.toByteArray,
|
||||
protocol.getOriginalAddress.getHostname,
|
||||
protocol.getOriginalAddress.getPort,
|
||||
if (protocol.hasIsTransactor) protocol.getIsTransactor else false,
|
||||
if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT,
|
||||
lifeCycle,
|
||||
supervisor,
|
||||
hotswap,
|
||||
loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader?
|
||||
serializer)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -138,19 +210,25 @@ trait ActorRef extends TransactionManagement {
|
|||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
*
|
||||
* <p/>
|
||||
* Set trapExit to the list of exception classes that the actor should be able to trap
|
||||
* from the actor it is supervising. When the supervising actor throws these exceptions
|
||||
* then they will trigger a restart.
|
||||
* <p/>
|
||||
*
|
||||
* Trap no exceptions:
|
||||
* <pre>
|
||||
* // trap no exceptions
|
||||
* trapExit = Nil
|
||||
* </pre>
|
||||
*
|
||||
* // trap all exceptions
|
||||
* Trap all exceptions:
|
||||
* <pre>
|
||||
* trapExit = List(classOf[Throwable])
|
||||
* </pre>
|
||||
*
|
||||
* // trap specific exceptions only
|
||||
* Trap specific exceptions only:
|
||||
* <pre>
|
||||
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
|
||||
* </pre>
|
||||
*/
|
||||
|
|
@ -160,10 +238,13 @@ trait ActorRef extends TransactionManagement {
|
|||
* User overridable callback/setting.
|
||||
* <p/>
|
||||
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
|
||||
* <p/>
|
||||
* Can be one of:
|
||||
* <pre/>
|
||||
* <pre>
|
||||
* faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
|
||||
*
|
||||
* </pre>
|
||||
* Or:
|
||||
* <pre>
|
||||
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
|
||||
* </pre>
|
||||
*/
|
||||
|
|
@ -228,7 +309,7 @@ trait ActorRef extends TransactionManagement {
|
|||
* The reference sender future of the last received message.
|
||||
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
|
||||
*/
|
||||
def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture }
|
||||
def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture }
|
||||
|
||||
/**
|
||||
* Is the actor being restarted?
|
||||
|
|
@ -586,16 +667,47 @@ trait ActorRef extends TransactionManagement {
|
|||
}
|
||||
|
||||
/**
|
||||
* Local ActorRef that is used when referencing the Actor on its "home" node.
|
||||
* Local (serializable) ActorRef that is used when referencing the Actor on its "home" node.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
sealed class LocalActorRef private[akka](
|
||||
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None))
|
||||
extends ActorRef {
|
||||
|
||||
private var isDeserialized = false
|
||||
private var loader: Option[ClassLoader] = None
|
||||
|
||||
private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz)))
|
||||
private[akka] def this(factory: () => Actor) = this(Right(Some(factory)))
|
||||
|
||||
// used only for deserialization
|
||||
private[akka] def this(__uuid: String,
|
||||
__id: String,
|
||||
__actorClassName: String,
|
||||
__actorBytes: Array[Byte],
|
||||
__hostname: String,
|
||||
__port: Int,
|
||||
__isTransactor: Boolean,
|
||||
__timeout: Long,
|
||||
__lifeCycle: Option[LifeCycle],
|
||||
__supervisor: Option[ActorRef],
|
||||
__hotswap: Option[PartialFunction[Any, Unit]],
|
||||
__loader: ClassLoader,
|
||||
__serializer: Serializer) = {
|
||||
this(() => __serializer.fromBinary(__actorBytes, Some(__loader.loadClass(__actorClassName))).asInstanceOf[Actor])
|
||||
loader = Some(__loader)
|
||||
isDeserialized = true
|
||||
_uuid = __uuid
|
||||
id = __id
|
||||
homeAddress = (__hostname, __port)
|
||||
isTransactor = __isTransactor
|
||||
timeout = __timeout
|
||||
lifeCycle = __lifeCycle
|
||||
_supervisor = __supervisor
|
||||
hotswap = __hotswap
|
||||
ActorRegistry.register(this)
|
||||
}
|
||||
|
||||
// Only mutable for RemoteServer in order to maintain identity across nodes
|
||||
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None
|
||||
|
|
@ -612,7 +724,7 @@ sealed class LocalActorRef private[akka](
|
|||
// instance elegible for garbage collection
|
||||
private val actorSelfFields = findActorSelfField(actor.getClass)
|
||||
|
||||
if (runActorInitialization) initializeActorInstance
|
||||
if (runActorInitialization && !isDeserialized) initializeActorInstance
|
||||
|
||||
/**
|
||||
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
|
||||
|
|
@ -637,7 +749,8 @@ sealed class LocalActorRef private[akka](
|
|||
}
|
||||
|
||||
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard {
|
||||
if (!isSerializable) throw new IllegalStateException("Can't serialize an ActorRef using SerializedActorRefProtocol that is wrapping an Actor that is not mixing in the SerializableActor trait")
|
||||
if (!isSerializable) throw new IllegalStateException(
|
||||
"Can't serialize an ActorRef using SerializedActorRefProtocol\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
|
||||
|
||||
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
|
||||
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
|
||||
|
|
@ -654,8 +767,7 @@ sealed class LocalActorRef private[akka](
|
|||
builder.setPreRestart(callbacks.preRestart)
|
||||
builder.setPostRestart(callbacks.postRestart)
|
||||
Some(builder.build)
|
||||
case None =>
|
||||
None
|
||||
case None => None
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -323,9 +323,8 @@ class RemoteServerHandler(
|
|||
applicationLoader.foreach(RemoteProtocolBuilder.setClassLoader(_))
|
||||
|
||||
/**
|
||||
* ChannelOpen overridden to store open channels for a clean shutdown
|
||||
* of a RemoteServer. If a channel is closed before, it is
|
||||
* automatically removed from the open channels group.
|
||||
* ChannelOpen overridden to store open channels for a clean shutdown of a RemoteServer.
|
||||
* If a channel is closed before, it is automatically removed from the open channels group.
|
||||
*/
|
||||
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) {
|
||||
openChannels.add(ctx.getChannel)
|
||||
|
|
@ -364,8 +363,9 @@ class RemoteServerHandler(
|
|||
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
|
||||
actorRef.start
|
||||
val message = RemoteProtocolBuilder.getMessage(request)
|
||||
val sender = if (request.hasSender) Some(ActorRef.fromProtobuf(request.getSender, applicationLoader))
|
||||
else None
|
||||
val sender =
|
||||
if (request.hasSender) Some(ActorRef.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
|
||||
else None
|
||||
if (request.getIsOneWay) actorRef.!(message)(sender)
|
||||
else {
|
||||
try {
|
||||
|
|
@ -403,7 +403,6 @@ class RemoteServerHandler(
|
|||
val argClasses = args.map(_.getClass)
|
||||
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout)
|
||||
|
||||
//continueTransaction(request)
|
||||
try {
|
||||
val messageReceiver = activeObject.getClass.getDeclaredMethod(
|
||||
request.getMethod, unescapedArgClasses: _*)
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ class StmSpec extends
|
|||
}
|
||||
|
||||
describe("Transactor") {
|
||||
it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multipse times in a row") {
|
||||
it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multiple times in a row") {
|
||||
import GlobalTransactionVectorTestActor._
|
||||
try {
|
||||
val actor = actorOf[NestedTransactorLevelOneActor].start
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue