diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index ec4cd938ad..321095d84c 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -11,8 +11,10 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.Transaction.Global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest -import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestIdFactory} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol +import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, + RemoteActorProxy, RemoteProtocolBuilder, + RemoteRequestIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} @@ -242,10 +244,34 @@ object Actor extends Logging { } +/** + * The ActorID object can be used to create ActorID instances out of its binary + * protobuf based representation. + *
+ * val actorRef = ActorID.fromBinary(bytes) + * actorRef ! message // send message to remote actor through its reference + *+ * + * @author Jonas Bonér + */ +object ActorID { + def fromBinary(bytes: Array[Byte]): ActorID = { + val actorRefProto = RemoteProtocol.ActorRef.newBuilder.mergeFrom(bytes).build + RemoteActorProxy( + actorRefProto.getUuid, + actorRefProto.getActorClassName, + actorRefProto.getSourceHostname, + actorRefProto.getSourcePort, + actorRefProto.getTimeout) + } +} + /** * ActorID is an immutable and serializable handle to an Actor. + * * Create an ActorID for an Actor by using the factory method on the Actor object. - * Here is an example: + * + * Here is an example on how to create an actor with a default constructor. *
* import Actor._
*
@@ -254,24 +280,23 @@ object Actor extends Logging {
* actor ! message
* actor.stop
*
+ * Here is an example on how to create an actor with a non-default constructor.
+ * + * import Actor._ + * + * val actor = newActor(() => new MyActor(...)) + * actor.start + * actor ! message + * actor.stop + ** * @author Jonas Bonér */ final class ActorID private[akka] () { - private[akka] var newActorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) + private[akka] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) - private[akka] def this(clazz: Class[_ <: Actor]) = { - this() - newActorFactory = Left(Some(clazz)) - } - - private[akka] def this(factory: () => Actor) = { - this() - newActorFactory = Right(Some(factory)) - } - private[akka] lazy val actor: Actor = { - val actor = newActorFactory match { + val actor = actorFactory match { case Left(Some(clazz)) => try { clazz.newInstance @@ -290,6 +315,30 @@ final class ActorID private[akka] () { actor } + private[akka] def this(clazz: Class[_ <: Actor]) = { + this() + actorFactory = Left(Some(clazz)) + } + + private[akka] def this(factory: () => Actor) = { + this() + actorFactory = Right(Some(factory)) + } + + def toBinary: Array[Byte] = { + if (!actor._registeredInRemoteNodeDuringSerialization) { + RemoteNode.register(uuid, this) + actor._registeredInRemoteNodeDuringSerialization = true + } + RemoteProtocol.ActorRef.newBuilder + .setUuid(uuid) + .setActorClassName(actorClass.getName) + .setSourceHostname(RemoteServer.HOSTNAME) + .setSourcePort(RemoteServer.PORT) + .setTimeout(timeout) + .build.toByteArray + } + /** * Returns the class for the Actor instance that is managed by the ActorID. */ @@ -546,6 +595,7 @@ trait Actor extends TransactionManagement with Logging { @volatile private[this] var _isSuspended = true @volatile private[this] var _isShutDown = false @volatile private[akka] var _isKilled = false + @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = true private var _hotswap: Option[PartialFunction[Any, Unit]] = None private[akka] var _remoteAddress: Option[InetSocketAddress] = None private[akka] var _linkedActors: Option[HashSet[ActorID]] = None @@ -755,6 +805,7 @@ trait Actor extends TransactionManagement with Logging { shutdown ActorRegistry.unregister(self) _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) + RemoteNode.unregister(self) } } @@ -1011,7 +1062,7 @@ trait Actor extends TransactionManagement with Logging { joinTransaction(message) if (_remoteAddress.isDefined) { - val requestBuilder = RemoteRequest.newBuilder + val requestBuilder = RemoteProtocol.RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(this.timeout) @@ -1062,7 +1113,7 @@ trait Actor extends TransactionManagement with Logging { joinTransaction(message) if (_remoteAddress.isDefined) { - val requestBuilder = RemoteRequest.newBuilder + val requestBuilder = RemoteProtocol.RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(this.timeout) @@ -1079,7 +1130,8 @@ trait Actor extends TransactionManagement with Logging { } else { val future = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](timeout) - val invocation = new MessageInvocation(self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) + val invocation = new MessageInvocation( + self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation) diff --git a/akka-core/src/main/scala/actor/ActorIdProtobufSpec.proto b/akka-core/src/main/scala/actor/ActorIdProtobufSpec.proto new file mode 100644 index 0000000000..e69de29bb2 diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 9c03683771..4948b3cb72 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} /** + * Atomic remote request/reply message id generator. + * * @author Jonas Bonér */ object RemoteRequestIdFactory { @@ -41,6 +43,69 @@ case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEven case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent +/** + * Remote Actor proxy factory. + * + * @author Jonas Bonér + */ +private[akka] object RemoteActorProxy { + def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorID = + new ActorID(() => new RemoteActorProxy(uuid, className, hostname, port, timeout)) +} + +/** + * Remote Actor proxy. + * + * @author Jonas Bonér + */ +private[akka] class RemoteActorProxy private ( + uuid: String, className: String, hostname: String, port: Int, timeOut: Long) extends Actor { + start + val remoteClient = RemoteClient.clientFor(hostname, port) + + override def postMessageToMailbox(message: Any, senderOption: Option[ActorID]): Unit = { + val requestBuilder = RemoteRequest.newBuilder + .setId(RemoteRequestIdFactory.nextId) + .setTarget(className) + .setTimeout(timeOut) + .setUuid(uuid) + .setIsActor(true) + .setIsOneWay(true) + .setIsEscaped(false) + if (senderOption.isDefined) { + val sender = senderOption.get.actor + requestBuilder.setSourceTarget(sender.getClass.getName) + requestBuilder.setSourceUuid(sender.uuid) + val (host, port) = sender._replyToAddress.map(address => + (address.getHostName, address.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) + requestBuilder.setSourceHostname(host) + requestBuilder.setSourcePort(port) + } + RemoteProtocolBuilder.setMessage(message, requestBuilder) + remoteClient.send[Any](requestBuilder.build, None) + } + + override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + message: Any, + timeout: Long, + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + val requestBuilder = RemoteRequest.newBuilder + .setId(RemoteRequestIdFactory.nextId) + .setTarget(className) + .setTimeout(timeout) + .setUuid(uuid) + .setIsActor(true) + .setIsOneWay(false) + .setIsEscaped(false) + RemoteProtocolBuilder.setMessage(message, requestBuilder) + val future = remoteClient.send(requestBuilder.build, senderFuture) + if (future.isDefined) future.get + else throw new IllegalStateException("Expected a future from remote call to actor " + toString) + } + + def receive = {case _ => {}} +} + /** * @author Jonas Bonér */ @@ -62,53 +127,8 @@ object RemoteClient extends Logging { def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorID = actorFor(className, className, timeout, hostname, port) - def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorID = new ActorID(() => - new Actor { - start - val remoteClient = RemoteClient.clientFor(hostname, port) - - override def postMessageToMailbox(message: Any, sender: Option[ActorID]): Unit = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) - .setTarget(className) - .setTimeout(timeout) - .setUuid(actorId) - .setIsActor(true) - .setIsOneWay(true) - .setIsEscaped(false) - if (sender.isDefined) { - val sndr = sender.get.actor - requestBuilder.setSourceTarget(sndr.getClass.getName) - requestBuilder.setSourceUuid(sndr.uuid) - val (host, port) = sndr._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) - requestBuilder.setSourceHostname(host) - requestBuilder.setSourcePort(port) - } - RemoteProtocolBuilder.setMessage(message, requestBuilder) - remoteClient.send[Any](requestBuilder.build, None) - } - - override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) - .setTarget(className) - .setTimeout(timeout) - .setUuid(actorId) - .setIsActor(true) - .setIsOneWay(false) - .setIsEscaped(false) - RemoteProtocolBuilder.setMessage(message, requestBuilder) - val future = remoteClient.send(requestBuilder.build, senderFuture) - if (future.isDefined) future.get - else throw new IllegalStateException("Expected a future from remote call to actor " + toString) - } - - def receive = {case _ => {}} - } - ) + def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorID = + RemoteActorProxy(actorId, className, hostname, port, timeout) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 64e72f3fe8..f84a7a9ad9 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -168,7 +168,8 @@ class RemoteServer extends Logging { log.info("Starting remote server at [%s:%s]", hostname, port) RemoteServer.register(hostname, port, this) val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) - val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects) + val pipelineFactory = new RemoteServerPipelineFactory( + name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) @@ -200,7 +201,7 @@ class RemoteServer extends Logging { */ def register(actor: ActorID) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id) + log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor) } } @@ -210,10 +211,22 @@ class RemoteServer extends Logging { */ def register(id: String, actor: ActorID) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id) + log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, id) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor) } } + + /** + * Unregister Remote Actor. + */ + def unregister(actor: ActorID) = synchronized { + if (_isRunning) { + log.info("Unregistering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id) + val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) + server.actors.put(actor.id, actor) + if (actor.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actor.uuid) + } + } } case class Codec(encoder : ChannelHandler, decoder : ChannelHandler) diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java index f8ee893393..724978ef12 100644 --- a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java @@ -7,6 +7,443 @@ public final class RemoteProtocol { public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { } + public static final class ActorRef extends + com.google.protobuf.GeneratedMessage { + // Use ActorRef.newBuilder() to construct. + private ActorRef() {} + + private static final ActorRef defaultInstance = new ActorRef(); + public static ActorRef getDefaultInstance() { + return defaultInstance; + } + + public ActorRef getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_fieldAccessorTable; + } + + // required string uuid = 1; + public static final int UUID_FIELD_NUMBER = 1; + private boolean hasUuid; + private java.lang.String uuid_ = ""; + public boolean hasUuid() { return hasUuid; } + public java.lang.String getUuid() { return uuid_; } + + // required string actorClassName = 2; + public static final int ACTORCLASSNAME_FIELD_NUMBER = 2; + private boolean hasActorClassName; + private java.lang.String actorClassName_ = ""; + public boolean hasActorClassName() { return hasActorClassName; } + public java.lang.String getActorClassName() { return actorClassName_; } + + // required string sourceHostname = 3; + public static final int SOURCEHOSTNAME_FIELD_NUMBER = 3; + private boolean hasSourceHostname; + private java.lang.String sourceHostname_ = ""; + public boolean hasSourceHostname() { return hasSourceHostname; } + public java.lang.String getSourceHostname() { return sourceHostname_; } + + // required uint32 sourcePort = 4; + public static final int SOURCEPORT_FIELD_NUMBER = 4; + private boolean hasSourcePort; + private int sourcePort_ = 0; + public boolean hasSourcePort() { return hasSourcePort; } + public int getSourcePort() { return sourcePort_; } + + // required uint64 timeout = 5; + public static final int TIMEOUT_FIELD_NUMBER = 5; + private boolean hasTimeout; + private long timeout_ = 0L; + public boolean hasTimeout() { return hasTimeout; } + public long getTimeout() { return timeout_; } + + public final boolean isInitialized() { + if (!hasUuid) return false; + if (!hasActorClassName) return false; + if (!hasSourceHostname) return false; + if (!hasSourcePort) return false; + if (!hasTimeout) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (hasUuid()) { + output.writeString(1, getUuid()); + } + if (hasActorClassName()) { + output.writeString(2, getActorClassName()); + } + if (hasSourceHostname()) { + output.writeString(3, getSourceHostname()); + } + if (hasSourcePort()) { + output.writeUInt32(4, getSourcePort()); + } + if (hasTimeout()) { + output.writeUInt64(5, getTimeout()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasUuid()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(1, getUuid()); + } + if (hasActorClassName()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getActorClassName()); + } + if (hasSourceHostname()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(3, getSourceHostname()); + } + if (hasSourcePort()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(4, getSourcePort()); + } + if (hasTimeout()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(5, getTimeout()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder