diff --git a/akka-core/src/main/protocol/RemoteProtocol.proto b/akka-core/src/main/protocol/RemoteProtocol.proto index eb3b9826cb..e967d8b3ac 100644 --- a/akka-core/src/main/protocol/RemoteProtocol.proto +++ b/akka-core/src/main/protocol/RemoteProtocol.proto @@ -11,6 +11,69 @@ option optimize_for = SPEED; protoc RemoteProtocol.proto --java_out ../java *******************************************/ +/** + * Defines a remote ActorRef that "remembers" and uses its original Actor instance + * on the original node. + */ +message RemoteActorRefProtocol { + required string uuid = 1; + required string actorClassname = 2; + required AddressProtocol homeAddress = 3; + optional uint64 timeout = 4; +} + +/** + * Defines a fully serialized remote ActorRef (with serialized Actor instance) + * that is about to be instantiated on the remote node. It is fully disconnected + * from its original host. + */ +message SerializedActorRefProtocol { + required string uuid = 1; + required string id = 2; + required string actorClassname = 3; + required bytes actorInstance = 4; + required string serializerClassname = 5; + required AddressProtocol originalAddress = 6; + optional bool isTransactor = 7; + optional uint64 timeout = 8; + optional LifeCycleProtocol lifeCycle = 9; + optional RemoteActorRefProtocol supervisor = 10; + optional bytes hotswapStack = 11; +} + +/** + * Defines a remote message request. + */ +message RemoteRequestProtocol { + required uint64 id = 1; + required SerializationSchemeType serializationScheme = 2; + required bytes message = 3; + optional bytes messageManifest = 4; + optional string method = 5; + required string target = 6; + required string uuid = 7; + required uint64 timeout = 8; + optional string supervisorUuid = 9; + required bool isActor = 10; + required bool isOneWay = 11; + required bool isEscaped = 12; + optional RemoteActorRefProtocol sender = 13; +} + +/** + * Defines a remote message reply. + */ +message RemoteReplyProtocol { + required uint64 id = 1; + optional SerializationSchemeType serializationScheme = 2; + optional bytes message = 3; + optional bytes messageManifest = 4; + optional ExceptionProtocol exception = 5; + optional string supervisorUuid = 6; + required bool isActor = 7; + required bool isSuccessful = 8; +} + /** * Defines the serialization scheme used to serialize the message and/or Actor instance. */ @@ -64,64 +127,3 @@ message ExceptionProtocol { required string classname = 1; required string message = 2; } - -/** - * Defines a remote ActorRef that "remembers" and uses its original Actor instance. - */ -message RemoteActorRefProtocol { - required string uuid = 1; - required string actorClassname = 2; - required AddressProtocol homeAddress = 3; - optional uint64 timeout = 4; -} - -/** - * Defines a fully serialized remote ActorRef that is about to be instantiated on - * the remote node. It is fully disconnected from its original host. - */ -message SerializedActorRefProtocol { - required string uuid = 1; - required string id = 2; - required string actorClassname = 3; - required bytes actorInstance = 4; - required string serializerClassname = 5; - required AddressProtocol originalAddress = 6; - optional bool isTransactor = 7; - optional uint64 timeout = 8; - optional LifeCycleProtocol lifeCycle = 9; - optional RemoteActorRefProtocol supervisor = 10; - optional bytes hotswapStack = 11; -} - -/** - * Defines a remote message request. - */ -message RemoteRequestProtocol { - required uint64 id = 1; - required SerializationSchemeType serializationScheme = 2; - required bytes message = 3; - optional bytes messageManifest = 4; - optional string method = 5; - required string target = 6; - required string uuid = 7; - required uint64 timeout = 8; - optional string supervisorUuid = 9; - required bool isActor = 10; - required bool isOneWay = 11; - required bool isEscaped = 12; - optional RemoteActorRefProtocol sender = 13; -} - -/** - * Defines a remote message reply. - */ -message RemoteReplyProtocol { - required uint64 id = 1; - optional SerializationSchemeType serializationScheme = 2; - optional bytes message = 3; - optional bytes messageManifest = 4; - optional ExceptionProtocol exception = 5; - optional string supervisorUuid = 6; - required bool isActor = 7; - required bool isSuccessful = 8; -} \ No newline at end of file diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 9763b37109..8951ec1ad9 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -33,35 +33,48 @@ import com.google.protobuf.ByteString /** * The ActorRef object can be used to deserialize ActorRef instances from of its binary representation * or its Protocol Buffers (protobuf) Message representation to a Actor.actorOf instance. + * *
* Binary -> ActorRef: ** val actorRef = ActorRef.fromBinary(bytes) * actorRef ! message // send message to remote actor through its reference *+ * * - * Protobuf Message -> ActorRef: + * Protobuf Message -> RemoteActorRef: *
- * val actorRef = ActorRef.fromProtobuf(protobufMessage) + * val actorRef = ActorRef.fromBinaryToRemoteActorRef(protobufMessage) * actorRef ! message // send message to remote actor through its reference *+ * + * + * Protobuf Message -> LocalActorRef: + *
+ * val actorRef = ActorRef.fromBinaryToLocalActorRef(protobufMessage) + * actorRef ! message // send message to local actor through its reference + *+ * * @author Jonas Bonér */ object ActorRef { /** - * Deserializes the ActorRef instance from a byte array (Array[Byte]) into an ActorRef instance. + * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. */ - def fromBinary(bytes: Array[Byte]): ActorRef = - fromProtobuf(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) + def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef = + fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) - def fromBinary(bytes: Array[Byte], loader: ClassLoader): ActorRef = - fromProtobuf(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) + /** + * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. + */ + def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef = + fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) /** - * Deserializes the ActorRef instance from a Protocol Buffers (protobuf) Message into an ActorRef instance. + * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. */ - private[akka] def fromProtobuf(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = + private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = RemoteActorRef( protocol.getUuid, protocol.getActorClassname, @@ -69,6 +82,65 @@ object ActorRef { protocol.getHomeAddress.getPort, protocol.getTimeout, loader) + + /** + * Deserializes a byte array (Array[Byte]) into an LocalActorRef instance. + */ + def fromBinaryToLocalActorRef(bytes: Array[Byte]): ActorRef = + fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) + + /** + * Deserializes a byte array (Array[Byte]) into an LocalActorRef instance. + */ + def fromBinaryToLocalActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef = + fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) + + /** + * Deserializes a SerializedActorRefProtocol Protocol Buffers (protobuf) Message into an LocalActorRef instance. + */ + private[akka] def fromProtobufToLocalActorRef(protocol: SerializedActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { + val serializerClass = + if (loader.isDefined) loader.get.loadClass(protocol.getSerializerClassname) + else Class.forName(protocol.getSerializerClassname) + val serializer = serializerClass.newInstance.asInstanceOf[Serializer] + + val lifeCycle = + if (protocol.hasLifeCycle) { + val lifeCycleProtocol = protocol.getLifeCycle + val restartCallbacks = + if (lifeCycleProtocol.hasPreRestart || lifeCycleProtocol.hasPostRestart) + Some(RestartCallbacks(lifeCycleProtocol.getPreRestart, lifeCycleProtocol.getPostRestart)) + else None + Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent, restartCallbacks) + else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary, restartCallbacks) + else throw new IllegalStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle)) + } else None + + val supervisor = + if (protocol.hasSupervisor) + Some(fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) + else None + val hotswap = + if (protocol.hasHotswapStack) Some(serializer + .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]])) + .asInstanceOf[PartialFunction[Any, Unit]]) + else None + + new LocalActorRef( + protocol.getUuid, + protocol.getId, + protocol.getActorClassname, + protocol.getActorInstance.toByteArray, + protocol.getOriginalAddress.getHostname, + protocol.getOriginalAddress.getPort, + if (protocol.hasIsTransactor) protocol.getIsTransactor else false, + if (protocol.hasTimeout) protocol.getTimeout else Actor.TIMEOUT, + lifeCycle, + supervisor, + hotswap, + loader.getOrElse(getClass.getClassLoader), // TODO: should we fall back to getClass.getClassLoader? + serializer) + } } /** @@ -138,19 +210,25 @@ trait ActorRef extends TransactionManagement { /** * User overridable callback/setting. + * * * Set trapExit to the list of exception classes that the actor should be able to trap * from the actor it is supervising. When the supervising actor throws these exceptions * then they will trigger a restart. * + * + * Trap no exceptions: *
- * // trap no exceptions
* trapExit = Nil
+ *
*
- * // trap all exceptions
+ * Trap all exceptions:
+ *
* trapExit = List(classOf[Throwable])
+ *
*
- * // trap specific exceptions only
+ * Trap specific exceptions only:
+ *
* trapExit = List(classOf[MyApplicationException], classOf[MyApplicationError])
*
*/
@@ -160,10 +238,13 @@ trait ActorRef extends TransactionManagement {
* User overridable callback/setting.
*
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
+ *
* Can be one of:
- *
+ *
* faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
- *
+ *
+ * Or:
+ *
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
*
*/
@@ -228,7 +309,7 @@ trait ActorRef extends TransactionManagement {
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
- def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture }
+ def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture }
/**
* Is the actor being restarted?
@@ -586,16 +667,47 @@ trait ActorRef extends TransactionManagement {
}
/**
- * Local ActorRef that is used when referencing the Actor on its "home" node.
+ * Local (serializable) ActorRef that is used when referencing the Actor on its "home" node.
*
* @author Jonas Bonér
*/
sealed class LocalActorRef private[akka](
private[this] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None))
extends ActorRef {
+
+ private var isDeserialized = false
+ private var loader: Option[ClassLoader] = None
private[akka] def this(clazz: Class[_ <: Actor]) = this(Left(Some(clazz)))
private[akka] def this(factory: () => Actor) = this(Right(Some(factory)))
+
+ // used only for deserialization
+ private[akka] def this(__uuid: String,
+ __id: String,
+ __actorClassName: String,
+ __actorBytes: Array[Byte],
+ __hostname: String,
+ __port: Int,
+ __isTransactor: Boolean,
+ __timeout: Long,
+ __lifeCycle: Option[LifeCycle],
+ __supervisor: Option[ActorRef],
+ __hotswap: Option[PartialFunction[Any, Unit]],
+ __loader: ClassLoader,
+ __serializer: Serializer) = {
+ this(() => __serializer.fromBinary(__actorBytes, Some(__loader.loadClass(__actorClassName))).asInstanceOf[Actor])
+ loader = Some(__loader)
+ isDeserialized = true
+ _uuid = __uuid
+ id = __id
+ homeAddress = (__hostname, __port)
+ isTransactor = __isTransactor
+ timeout = __timeout
+ lifeCycle = __lifeCycle
+ _supervisor = __supervisor
+ hotswap = __hotswap
+ ActorRegistry.register(this)
+ }
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile private[akka] var _remoteAddress: Option[InetSocketAddress] = None
@@ -612,7 +724,7 @@ sealed class LocalActorRef private[akka](
// instance elegible for garbage collection
private val actorSelfFields = findActorSelfField(actor.getClass)
- if (runActorInitialization) initializeActorInstance
+ if (runActorInitialization && !isDeserialized) initializeActorInstance
/**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
@@ -637,7 +749,8 @@ sealed class LocalActorRef private[akka](
}
protected[akka] def toSerializedActorRefProtocol: SerializedActorRefProtocol = guard.withGuard {
- if (!isSerializable) throw new IllegalStateException("Can't serialize an ActorRef using SerializedActorRefProtocol that is wrapping an Actor that is not mixing in the SerializableActor trait")
+ if (!isSerializable) throw new IllegalStateException(
+ "Can't serialize an ActorRef using SerializedActorRefProtocol\nthat is wrapping an Actor that is not mixing in the SerializableActor trait")
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
@@ -654,8 +767,7 @@ sealed class LocalActorRef private[akka](
builder.setPreRestart(callbacks.preRestart)
builder.setPostRestart(callbacks.postRestart)
Some(builder.build)
- case None =>
- None
+ case None => None
}
}
diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala
index af02a5a4e7..0d1540ab9a 100644
--- a/akka-core/src/main/scala/remote/RemoteServer.scala
+++ b/akka-core/src/main/scala/remote/RemoteServer.scala
@@ -323,9 +323,8 @@ class RemoteServerHandler(
applicationLoader.foreach(RemoteProtocolBuilder.setClassLoader(_))
/**
- * ChannelOpen overridden to store open channels for a clean shutdown
- * of a RemoteServer. If a channel is closed before, it is
- * automatically removed from the open channels group.
+ * ChannelOpen overridden to store open channels for a clean shutdown of a RemoteServer.
+ * If a channel is closed before, it is automatically removed from the open channels group.
*/
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) {
openChannels.add(ctx.getChannel)
@@ -364,8 +363,9 @@ class RemoteServerHandler(
val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout)
actorRef.start
val message = RemoteProtocolBuilder.getMessage(request)
- val sender = if (request.hasSender) Some(ActorRef.fromProtobuf(request.getSender, applicationLoader))
- else None
+ val sender =
+ if (request.hasSender) Some(ActorRef.fromProtobufToRemoteActorRef(request.getSender, applicationLoader))
+ else None
if (request.getIsOneWay) actorRef.!(message)(sender)
else {
try {
@@ -403,7 +403,6 @@ class RemoteServerHandler(
val argClasses = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.getTimeout)
- //continueTransaction(request)
try {
val messageReceiver = activeObject.getClass.getDeclaredMethod(
request.getMethod, unescapedArgClasses: _*)
diff --git a/akka-core/src/test/scala/StmSpec.scala b/akka-core/src/test/scala/StmSpec.scala
index 17d4be32bd..b7537b83b2 100644
--- a/akka-core/src/test/scala/StmSpec.scala
+++ b/akka-core/src/test/scala/StmSpec.scala
@@ -103,7 +103,7 @@ class StmSpec extends
}
describe("Transactor") {
- it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multipse times in a row") {
+ it("should be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multiple times in a row") {
import GlobalTransactionVectorTestActor._
try {
val actor = actorOf[NestedTransactorLevelOneActor].start