From f90e9c3f1a2d62f0e4470a409b952cac107b8e3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Wed, 5 May 2010 22:45:19 +0200 Subject: [PATCH 1/3] Add Protobuf serialization and deserialization of ActorID --- akka-core/src/main/scala/actor/Actor.scala | 88 +++- .../scala/actor/ActorIdProtobufSpec.proto | 0 .../src/main/scala/remote/RemoteClient.scala | 114 +++-- .../src/main/scala/remote/RemoteServer.scala | 19 +- .../akka/remote/protobuf/RemoteProtocol.java | 483 +++++++++++++++++- .../akka/remote/protobuf/RemoteProtocol.proto | 8 + 6 files changed, 629 insertions(+), 83 deletions(-) create mode 100644 akka-core/src/main/scala/actor/ActorIdProtobufSpec.proto diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index ec4cd938ad..321095d84c 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -11,8 +11,10 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.Transaction.Global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest -import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteProtocolBuilder, RemoteRequestIdFactory} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol +import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, + RemoteActorProxy, RemoteProtocolBuilder, + RemoteRequestIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} @@ -242,10 +244,34 @@ object Actor extends Logging { } +/** + * The ActorID object can be used to create ActorID instances out of its binary + * protobuf based representation. + *
+ *   val actorRef = ActorID.fromBinary(bytes)
+ *   actorRef ! message // send message to remote actor through its reference
+ * 
+ * + * @author Jonas Bonér + */ +object ActorID { + def fromBinary(bytes: Array[Byte]): ActorID = { + val actorRefProto = RemoteProtocol.ActorRef.newBuilder.mergeFrom(bytes).build + RemoteActorProxy( + actorRefProto.getUuid, + actorRefProto.getActorClassName, + actorRefProto.getSourceHostname, + actorRefProto.getSourcePort, + actorRefProto.getTimeout) + } +} + /** * ActorID is an immutable and serializable handle to an Actor. + *

* Create an ActorID for an Actor by using the factory method on the Actor object. - * Here is an example: + *

+ * Here is an example on how to create an actor with a default constructor. *

  *   import Actor._
  * 
@@ -254,24 +280,23 @@ object Actor extends Logging {
  *   actor ! message
  *   actor.stop
  * 
+ * Here is an example on how to create an actor with a non-default constructor. + *
+ *   import Actor._
+ * 
+ *   val actor = newActor(() => new MyActor(...))
+ *   actor.start
+ *   actor ! message
+ *   actor.stop
+ * 
* * @author Jonas Bonér */ final class ActorID private[akka] () { - private[akka] var newActorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) + private[akka] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None) - private[akka] def this(clazz: Class[_ <: Actor]) = { - this() - newActorFactory = Left(Some(clazz)) - } - - private[akka] def this(factory: () => Actor) = { - this() - newActorFactory = Right(Some(factory)) - } - private[akka] lazy val actor: Actor = { - val actor = newActorFactory match { + val actor = actorFactory match { case Left(Some(clazz)) => try { clazz.newInstance @@ -290,6 +315,30 @@ final class ActorID private[akka] () { actor } + private[akka] def this(clazz: Class[_ <: Actor]) = { + this() + actorFactory = Left(Some(clazz)) + } + + private[akka] def this(factory: () => Actor) = { + this() + actorFactory = Right(Some(factory)) + } + + def toBinary: Array[Byte] = { + if (!actor._registeredInRemoteNodeDuringSerialization) { + RemoteNode.register(uuid, this) + actor._registeredInRemoteNodeDuringSerialization = true + } + RemoteProtocol.ActorRef.newBuilder + .setUuid(uuid) + .setActorClassName(actorClass.getName) + .setSourceHostname(RemoteServer.HOSTNAME) + .setSourcePort(RemoteServer.PORT) + .setTimeout(timeout) + .build.toByteArray + } + /** * Returns the class for the Actor instance that is managed by the ActorID. */ @@ -546,6 +595,7 @@ trait Actor extends TransactionManagement with Logging { @volatile private[this] var _isSuspended = true @volatile private[this] var _isShutDown = false @volatile private[akka] var _isKilled = false + @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = true private var _hotswap: Option[PartialFunction[Any, Unit]] = None private[akka] var _remoteAddress: Option[InetSocketAddress] = None private[akka] var _linkedActors: Option[HashSet[ActorID]] = None @@ -755,6 +805,7 @@ trait Actor extends TransactionManagement with Logging { shutdown ActorRegistry.unregister(self) _remoteAddress.foreach(address => RemoteClient.unregister(address.getHostName, address.getPort, uuid)) + RemoteNode.unregister(self) } } @@ -1011,7 +1062,7 @@ trait Actor extends TransactionManagement with Logging { joinTransaction(message) if (_remoteAddress.isDefined) { - val requestBuilder = RemoteRequest.newBuilder + val requestBuilder = RemoteProtocol.RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(this.timeout) @@ -1062,7 +1113,7 @@ trait Actor extends TransactionManagement with Logging { joinTransaction(message) if (_remoteAddress.isDefined) { - val requestBuilder = RemoteRequest.newBuilder + val requestBuilder = RemoteProtocol.RemoteRequest.newBuilder .setId(RemoteRequestIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(this.timeout) @@ -1079,7 +1130,8 @@ trait Actor extends TransactionManagement with Logging { } else { val future = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](timeout) - val invocation = new MessageInvocation(self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) + val invocation = new MessageInvocation( + self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation) diff --git a/akka-core/src/main/scala/actor/ActorIdProtobufSpec.proto b/akka-core/src/main/scala/actor/ActorIdProtobufSpec.proto new file mode 100644 index 0000000000..e69de29bb2 diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 9c03683771..4948b3cb72 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{HashSet, HashMap} /** + * Atomic remote request/reply message id generator. + * * @author Jonas Bonér */ object RemoteRequestIdFactory { @@ -41,6 +43,69 @@ case class RemoteClientError(cause: Throwable) extends RemoteClientLifeCycleEven case class RemoteClientDisconnected(host: String, port: Int) extends RemoteClientLifeCycleEvent case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLifeCycleEvent +/** + * Remote Actor proxy factory. + * + * @author Jonas Bonér + */ +private[akka] object RemoteActorProxy { + def apply(uuid: String, className: String, hostname: String, port: Int, timeout: Long): ActorID = + new ActorID(() => new RemoteActorProxy(uuid, className, hostname, port, timeout)) +} + +/** + * Remote Actor proxy. + * + * @author Jonas Bonér + */ +private[akka] class RemoteActorProxy private ( + uuid: String, className: String, hostname: String, port: Int, timeOut: Long) extends Actor { + start + val remoteClient = RemoteClient.clientFor(hostname, port) + + override def postMessageToMailbox(message: Any, senderOption: Option[ActorID]): Unit = { + val requestBuilder = RemoteRequest.newBuilder + .setId(RemoteRequestIdFactory.nextId) + .setTarget(className) + .setTimeout(timeOut) + .setUuid(uuid) + .setIsActor(true) + .setIsOneWay(true) + .setIsEscaped(false) + if (senderOption.isDefined) { + val sender = senderOption.get.actor + requestBuilder.setSourceTarget(sender.getClass.getName) + requestBuilder.setSourceUuid(sender.uuid) + val (host, port) = sender._replyToAddress.map(address => + (address.getHostName, address.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) + requestBuilder.setSourceHostname(host) + requestBuilder.setSourcePort(port) + } + RemoteProtocolBuilder.setMessage(message, requestBuilder) + remoteClient.send[Any](requestBuilder.build, None) + } + + override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( + message: Any, + timeout: Long, + senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + val requestBuilder = RemoteRequest.newBuilder + .setId(RemoteRequestIdFactory.nextId) + .setTarget(className) + .setTimeout(timeout) + .setUuid(uuid) + .setIsActor(true) + .setIsOneWay(false) + .setIsEscaped(false) + RemoteProtocolBuilder.setMessage(message, requestBuilder) + val future = remoteClient.send(requestBuilder.build, senderFuture) + if (future.isDefined) future.get + else throw new IllegalStateException("Expected a future from remote call to actor " + toString) + } + + def receive = {case _ => {}} +} + /** * @author Jonas Bonér */ @@ -62,53 +127,8 @@ object RemoteClient extends Logging { def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorID = actorFor(className, className, timeout, hostname, port) - def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorID = new ActorID(() => - new Actor { - start - val remoteClient = RemoteClient.clientFor(hostname, port) - - override def postMessageToMailbox(message: Any, sender: Option[ActorID]): Unit = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) - .setTarget(className) - .setTimeout(timeout) - .setUuid(actorId) - .setIsActor(true) - .setIsOneWay(true) - .setIsEscaped(false) - if (sender.isDefined) { - val sndr = sender.get.actor - requestBuilder.setSourceTarget(sndr.getClass.getName) - requestBuilder.setSourceUuid(sndr.uuid) - val (host, port) = sndr._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) - requestBuilder.setSourceHostname(host) - requestBuilder.setSourcePort(port) - } - RemoteProtocolBuilder.setMessage(message, requestBuilder) - remoteClient.send[Any](requestBuilder.build, None) - } - - override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( - message: Any, - timeout: Long, - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) - .setTarget(className) - .setTimeout(timeout) - .setUuid(actorId) - .setIsActor(true) - .setIsOneWay(false) - .setIsEscaped(false) - RemoteProtocolBuilder.setMessage(message, requestBuilder) - val future = remoteClient.send(requestBuilder.build, senderFuture) - if (future.isDefined) future.get - else throw new IllegalStateException("Expected a future from remote call to actor " + toString) - } - - def receive = {case _ => {}} - } - ) + def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorID = + RemoteActorProxy(actorId, className, hostname, port, timeout) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 64e72f3fe8..f84a7a9ad9 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -168,7 +168,8 @@ class RemoteServer extends Logging { log.info("Starting remote server at [%s:%s]", hostname, port) RemoteServer.register(hostname, port, this) val remoteActorSet = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) - val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects) + val pipelineFactory = new RemoteServerPipelineFactory( + name, openChannels, loader, remoteActorSet.actors, remoteActorSet.activeObjects) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) @@ -200,7 +201,7 @@ class RemoteServer extends Logging { */ def register(actor: ActorID) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, actor.id) + log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor) } } @@ -210,10 +211,22 @@ class RemoteServer extends Logging { */ def register(id: String, actor: ActorID) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.getClass.getName, id) + log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, id) RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor) } } + + /** + * Unregister Remote Actor. + */ + def unregister(actor: ActorID) = synchronized { + if (_isRunning) { + log.info("Unregistering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id) + val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) + server.actors.put(actor.id, actor) + if (actor.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actor.uuid) + } + } } case class Codec(encoder : ChannelHandler, decoder : ChannelHandler) diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java index f8ee893393..724978ef12 100644 --- a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java @@ -7,6 +7,443 @@ public final class RemoteProtocol { public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { } + public static final class ActorRef extends + com.google.protobuf.GeneratedMessage { + // Use ActorRef.newBuilder() to construct. + private ActorRef() {} + + private static final ActorRef defaultInstance = new ActorRef(); + public static ActorRef getDefaultInstance() { + return defaultInstance; + } + + public ActorRef getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_fieldAccessorTable; + } + + // required string uuid = 1; + public static final int UUID_FIELD_NUMBER = 1; + private boolean hasUuid; + private java.lang.String uuid_ = ""; + public boolean hasUuid() { return hasUuid; } + public java.lang.String getUuid() { return uuid_; } + + // required string actorClassName = 2; + public static final int ACTORCLASSNAME_FIELD_NUMBER = 2; + private boolean hasActorClassName; + private java.lang.String actorClassName_ = ""; + public boolean hasActorClassName() { return hasActorClassName; } + public java.lang.String getActorClassName() { return actorClassName_; } + + // required string sourceHostname = 3; + public static final int SOURCEHOSTNAME_FIELD_NUMBER = 3; + private boolean hasSourceHostname; + private java.lang.String sourceHostname_ = ""; + public boolean hasSourceHostname() { return hasSourceHostname; } + public java.lang.String getSourceHostname() { return sourceHostname_; } + + // required uint32 sourcePort = 4; + public static final int SOURCEPORT_FIELD_NUMBER = 4; + private boolean hasSourcePort; + private int sourcePort_ = 0; + public boolean hasSourcePort() { return hasSourcePort; } + public int getSourcePort() { return sourcePort_; } + + // required uint64 timeout = 5; + public static final int TIMEOUT_FIELD_NUMBER = 5; + private boolean hasTimeout; + private long timeout_ = 0L; + public boolean hasTimeout() { return hasTimeout; } + public long getTimeout() { return timeout_; } + + public final boolean isInitialized() { + if (!hasUuid) return false; + if (!hasActorClassName) return false; + if (!hasSourceHostname) return false; + if (!hasSourcePort) return false; + if (!hasTimeout) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (hasUuid()) { + output.writeString(1, getUuid()); + } + if (hasActorClassName()) { + output.writeString(2, getActorClassName()); + } + if (hasSourceHostname()) { + output.writeString(3, getSourceHostname()); + } + if (hasSourcePort()) { + output.writeUInt32(4, getSourcePort()); + } + if (hasTimeout()) { + output.writeUInt64(5, getTimeout()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasUuid()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(1, getUuid()); + } + if (hasActorClassName()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getActorClassName()); + } + if (hasSourceHostname()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(3, getSourceHostname()); + } + if (hasSourcePort()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(4, getSourcePort()); + } + if (hasTimeout()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt64Size(5, getTimeout()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeDelimitedFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef result; + + // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef(); + return builder; + } + + protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.getDescriptor(); + } + + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef) { + return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef other) { + if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.getDefaultInstance()) return this; + if (other.hasUuid()) { + setUuid(other.getUuid()); + } + if (other.hasActorClassName()) { + setActorClassName(other.getActorClassName()); + } + if (other.hasSourceHostname()) { + setSourceHostname(other.getSourceHostname()); + } + if (other.hasSourcePort()) { + setSourcePort(other.getSourcePort()); + } + if (other.hasTimeout()) { + setTimeout(other.getTimeout()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 10: { + setUuid(input.readString()); + break; + } + case 18: { + setActorClassName(input.readString()); + break; + } + case 26: { + setSourceHostname(input.readString()); + break; + } + case 32: { + setSourcePort(input.readUInt32()); + break; + } + case 40: { + setTimeout(input.readUInt64()); + break; + } + } + } + } + + + // required string uuid = 1; + public boolean hasUuid() { + return result.hasUuid(); + } + public java.lang.String getUuid() { + return result.getUuid(); + } + public Builder setUuid(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasUuid = true; + result.uuid_ = value; + return this; + } + public Builder clearUuid() { + result.hasUuid = false; + result.uuid_ = getDefaultInstance().getUuid(); + return this; + } + + // required string actorClassName = 2; + public boolean hasActorClassName() { + return result.hasActorClassName(); + } + public java.lang.String getActorClassName() { + return result.getActorClassName(); + } + public Builder setActorClassName(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasActorClassName = true; + result.actorClassName_ = value; + return this; + } + public Builder clearActorClassName() { + result.hasActorClassName = false; + result.actorClassName_ = getDefaultInstance().getActorClassName(); + return this; + } + + // required string sourceHostname = 3; + public boolean hasSourceHostname() { + return result.hasSourceHostname(); + } + public java.lang.String getSourceHostname() { + return result.getSourceHostname(); + } + public Builder setSourceHostname(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasSourceHostname = true; + result.sourceHostname_ = value; + return this; + } + public Builder clearSourceHostname() { + result.hasSourceHostname = false; + result.sourceHostname_ = getDefaultInstance().getSourceHostname(); + return this; + } + + // required uint32 sourcePort = 4; + public boolean hasSourcePort() { + return result.hasSourcePort(); + } + public int getSourcePort() { + return result.getSourcePort(); + } + public Builder setSourcePort(int value) { + result.hasSourcePort = true; + result.sourcePort_ = value; + return this; + } + public Builder clearSourcePort() { + result.hasSourcePort = false; + result.sourcePort_ = 0; + return this; + } + + // required uint64 timeout = 5; + public boolean hasTimeout() { + return result.hasTimeout(); + } + public long getTimeout() { + return result.getTimeout(); + } + public Builder setTimeout(long value) { + result.hasTimeout = true; + result.timeout_ = value; + return this; + } + public Builder clearTimeout() { + result.hasTimeout = false; + result.timeout_ = 0L; + return this; + } + } + + static { + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.getDescriptor(); + } + + static { + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internalForceInit(); + } + } + public static final class RemoteRequest extends com.google.protobuf.GeneratedMessage { // Use RemoteRequest.newBuilder() to construct. @@ -1450,6 +1887,11 @@ public final class RemoteProtocol { } } + private static com.google.protobuf.Descriptors.Descriptor + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor; private static @@ -1471,27 +1913,38 @@ public final class RemoteProtocol { java.lang.String[] descriptorData = { "\n>se/scalablesolutions/akka/remote/proto" + "buf/RemoteProtocol.proto\022)se.scalablesol" + - "utions.akka.remote.protobuf\"\272\002\n\rRemoteRe" + - "quest\022\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \002(\r\022\017\n\007m" + - "essage\030\003 \002(\014\022\027\n\017messageManifest\030\004 \001(\014\022\016\n" + - "\006method\030\005 \001(\t\022\016\n\006target\030\006 \002(\t\022\014\n\004uuid\030\007 " + - "\002(\t\022\017\n\007timeout\030\010 \002(\004\022\026\n\016supervisorUuid\030\t" + - " \001(\t\022\017\n\007isActor\030\n \002(\010\022\020\n\010isOneWay\030\013 \002(\010\022" + - "\021\n\tisEscaped\030\014 \002(\010\022\026\n\016sourceHostname\030\r \001" + - "(\t\022\022\n\nsourcePort\030\016 \001(\r\022\024\n\014sourceTarget\030\017", - " \001(\t\022\022\n\nsourceUuid\030\020 \001(\t\"\247\001\n\013RemoteReply" + - "\022\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \001(\r\022\017\n\007messag" + - "e\030\003 \001(\014\022\027\n\017messageManifest\030\004 \001(\014\022\021\n\texce" + - "ption\030\005 \001(\t\022\026\n\016supervisorUuid\030\006 \001(\t\022\017\n\007i" + - "sActor\030\007 \002(\010\022\024\n\014isSuccessful\030\010 \002(\010" + "utions.akka.remote.protobuf\"m\n\010ActorRef\022" + + "\014\n\004uuid\030\001 \002(\t\022\026\n\016actorClassName\030\002 \002(\t\022\026\n" + + "\016sourceHostname\030\003 \002(\t\022\022\n\nsourcePort\030\004 \002(" + + "\r\022\017\n\007timeout\030\005 \002(\004\"\272\002\n\rRemoteRequest\022\n\n\002" + + "id\030\001 \002(\004\022\020\n\010protocol\030\002 \002(\r\022\017\n\007message\030\003 " + + "\002(\014\022\027\n\017messageManifest\030\004 \001(\014\022\016\n\006method\030\005" + + " \001(\t\022\016\n\006target\030\006 \002(\t\022\014\n\004uuid\030\007 \002(\t\022\017\n\007ti" + + "meout\030\010 \002(\004\022\026\n\016supervisorUuid\030\t \001(\t\022\017\n\007i", + "sActor\030\n \002(\010\022\020\n\010isOneWay\030\013 \002(\010\022\021\n\tisEsca" + + "ped\030\014 \002(\010\022\026\n\016sourceHostname\030\r \001(\t\022\022\n\nsou" + + "rcePort\030\016 \001(\r\022\024\n\014sourceTarget\030\017 \001(\t\022\022\n\ns" + + "ourceUuid\030\020 \001(\t\"\247\001\n\013RemoteReply\022\n\n\002id\030\001 " + + "\002(\004\022\020\n\010protocol\030\002 \001(\r\022\017\n\007message\030\003 \001(\014\022\027" + + "\n\017messageManifest\030\004 \001(\014\022\021\n\texception\030\005 \001" + + "(\t\022\026\n\016supervisorUuid\030\006 \001(\t\022\017\n\007isActor\030\007 " + + "\002(\010\022\024\n\014isSuccessful\030\010 \002(\010" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { public com.google.protobuf.ExtensionRegistry assignDescriptors( com.google.protobuf.Descriptors.FileDescriptor root) { descriptor = root; - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor = + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor = getDescriptor().getMessageTypes().get(0); + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor, + new java.lang.String[] { "Uuid", "ActorClassName", "SourceHostname", "SourcePort", "Timeout", }, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.class, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.Builder.class); + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor = + getDescriptor().getMessageTypes().get(1); internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor, @@ -1499,7 +1952,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.class, se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.Builder.class); internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor = - getDescriptor().getMessageTypes().get(1); + getDescriptor().getMessageTypes().get(2); internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor, diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto index 79885bd043..c2ccef040d 100644 --- a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto @@ -10,6 +10,14 @@ package se.scalablesolutions.akka.remote.protobuf; protoc se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto --java_out . */ +message ActorRef { + required string uuid = 1; + required string actorClassName = 2; + required string sourceHostname = 3; + required uint32 sourcePort = 4; + required uint64 timeout = 5; +} + message RemoteRequest { required uint64 id = 1; required uint32 protocol = 2; From a820b6f0ae9c1f0e694af7cdc8ecd31e4e8d1806 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Wed, 5 May 2010 22:48:30 +0200 Subject: [PATCH 2/3] converted tabs to spaces --- akka-core/src/main/scala/actor/Actor.scala | 15 ++++++++++----- .../src/main/scala/stm/TransactionalState.scala | 2 +- .../src/test/scala/TransactionalRefSpec.scala | 6 +++--- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 321095d84c..756905680e 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -230,18 +230,23 @@ object Actor extends Logging { } } - /** Starts the specified actor and returns it, useful for: - *
val actor = new FooActor
+  /** 
+   * Starts the specified actor and returns it, useful for simplifying code such as:
+   * 
+   *  val actor = new FooActor
    *  actor.start
-   *  //Gets replaced by
+   * 
+ * can be replaced with: + *
+   *  import Actor._
+   *
    *  val actor = start(new FooActor)
-   *  
+ *
*/ def start[T <: Actor](actor : T) : T = { actor.start actor } - } /** diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala index f870af75e6..67f0615458 100644 --- a/akka-core/src/main/scala/stm/TransactionalState.scala +++ b/akka-core/src/main/scala/stm/TransactionalState.scala @@ -163,7 +163,7 @@ class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional { * Necessary to keep from being implicitly converted to Iterable in for comprehensions. */ def withFilter(p: T => Boolean): WithFilter = new WithFilter(p) - + class WithFilter(p: T => Boolean) { def map[B](f: T => B): TransactionalRef[B] = self filter p map f def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = self filter p flatMap f diff --git a/akka-core/src/test/scala/TransactionalRefSpec.scala b/akka-core/src/test/scala/TransactionalRefSpec.scala index 897e9583e0..14a0a8f186 100644 --- a/akka-core/src/test/scala/TransactionalRefSpec.scala +++ b/akka-core/src/test/scala/TransactionalRefSpec.scala @@ -98,9 +98,9 @@ class TransactionalRefSpec extends Spec with ShouldMatchers { var result = 0 atomic { - for (value <- ref) { - result += value - } + for (value <- ref) { + result += value + } } result should be(3) From fb3ae7ed2bc82d2d1bc9af25154dd24c97272575 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Fri, 7 May 2010 11:19:19 +0200 Subject: [PATCH 3/3] Rewrite of remote protocol to use the new ActorRef protocol --- .../scala/service/ConsumerPublisher.scala | 16 +- .../src/main/scala/actor/ActiveObject.scala | 36 +- akka-core/src/main/scala/actor/Actor.scala | 163 +++--- .../src/main/scala/actor/ActorRegistry.scala | 26 +- .../src/main/scala/actor/Scheduler.scala | 6 +- .../src/main/scala/actor/Supervisor.scala | 18 +- .../main/scala/config/SupervisionConfig.scala | 12 +- ...actReactorBasedEventDrivenDispatcher.scala | 12 +- ...sedEventDrivenWorkStealingDispatcher.scala | 16 +- .../main/scala/dispatch/MessageHandling.scala | 6 +- .../src/main/scala/remote/RemoteClient.scala | 59 +-- .../scala/remote/RemoteProtocolBuilder.scala | 10 +- .../src/main/scala/remote/RemoteServer.scala | 107 ++-- akka-core/src/test/scala/FutureSpec.scala | 4 +- .../src/test/scala/MemoryFootprintSpec.scala | 36 -- .../ServerInitiatedRemoteActorSpec.scala | 2 +- .../akka/remote/protobuf/RemoteProtocol.java | 477 +++++++----------- .../akka/remote/protobuf/RemoteProtocol.proto | 11 +- 18 files changed, 435 insertions(+), 582 deletions(-) delete mode 100644 akka-core/src/test/scala/MemoryFootprintSpec.scala diff --git a/akka-camel/src/main/scala/service/ConsumerPublisher.scala b/akka-camel/src/main/scala/service/ConsumerPublisher.scala index 45aa0514f6..722f4e428e 100644 --- a/akka-camel/src/main/scala/service/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/service/ConsumerPublisher.scala @@ -121,15 +121,15 @@ object Publish { def forConsumer(actor: ActorRef): Option[Publish] = forConsumeAnnotated(actor) orElse forConsumerType(actor) - private def forConsumeAnnotated(actorId: ActorRef): Option[Publish] = { - val annotation = actorId.actorClass.getAnnotation(classOf[consume]) + private def forConsumeAnnotated(actorRef: ActorRef): Option[Publish] = { + val annotation = actorRef.actorClass.getAnnotation(classOf[consume]) if (annotation eq null) None - else if (actorId.remoteAddress.isDefined) None // do not publish proxies - else Some(Publish(annotation.value, actorId.id, false)) + else if (actorRef.remoteAddress.isDefined) None // do not publish proxies + else Some(Publish(annotation.value, actorRef.id, false)) } - private def forConsumerType(actorId: ActorRef): Option[Publish] = - if (!actorId.actor.isInstanceOf[Consumer]) None - else if (actorId.remoteAddress.isDefined) None - else Some(Publish(actorId.actor.asInstanceOf[Consumer].endpointUri, actorId.uuid, true)) + private def forConsumerType(actorRef: ActorRef): Option[Publish] = + if (!actorRef.actor.isInstanceOf[Consumer]) None + else if (actorRef.remoteAddress.isDefined) None + else Some(Publish(actorRef.actor.asInstanceOf[Consumer].endpointUri, actorRef.uuid, true)) } diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 187d770a07..7a81953403 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -5,8 +5,8 @@ package se.scalablesolutions.akka.actor import se.scalablesolutions.akka.config.FaultHandlingStrategy -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest -import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol +import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestProtocolIdFactory} import se.scalablesolutions.akka.dispatch.{MessageDispatcher, Future} import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer @@ -284,9 +284,9 @@ object ActiveObject { actor.initialize(target, proxy) actor.timeout = timeout if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorId = new ActorRef(() => actor) - AspectInitRegistry.register(proxy, AspectInit(target, actorId, remoteAddress, timeout)) - actorId.start + val actorRef = new ActorRef(() => actor) + AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout)) + actorRef.start proxy.asInstanceOf[T] } @@ -295,9 +295,9 @@ object ActiveObject { actor.initialize(target.getClass, target) actor.timeout = timeout if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) - val actorId = new ActorRef(() => actor) - AspectInitRegistry.register(proxy, AspectInit(intf, actorId, remoteAddress, timeout)) - actorId.start + val actorRef = new ActorRef(() => actor) + AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout)) + actorRef.start proxy.asInstanceOf[T] } @@ -388,10 +388,10 @@ private[akka] object AspectInitRegistry { private[akka] sealed case class AspectInit( val target: Class[_], - val actorId: ActorRef, + val actorRef: ActorRef, val remoteAddress: Option[InetSocketAddress], val timeout: Long) { - def this(target: Class[_], actorId: ActorRef, timeout: Long) = this(target, actorId, None, timeout) + def this(target: Class[_], actorRef: ActorRef, timeout: Long) = this(target, actorRef, None, timeout) } /** @@ -405,7 +405,7 @@ private[akka] sealed case class AspectInit( private[akka] sealed class ActiveObjectAspect { @volatile private var isInitialized = false private var target: Class[_] = _ - private var actorId: ActorRef = _ + private var actorRef: ActorRef = _ private var remoteAddress: Option[InetSocketAddress] = _ private var timeout: Long = _ @@ -414,7 +414,7 @@ private[akka] sealed class ActiveObjectAspect { if (!isInitialized) { val init = AspectInitRegistry.initFor(joinPoint.getThis) target = init.target - actorId = init.actorId + actorRef = init.actorRef remoteAddress = init.remoteAddress timeout = init.timeout isInitialized = true @@ -430,10 +430,10 @@ private[akka] sealed class ActiveObjectAspect { private def localDispatch(joinPoint: JoinPoint): AnyRef = { val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti] if (isOneWay(rtti)) { - (actorId ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef] + (actorRef ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef] } else { - val result = actorId !! (Invocation(joinPoint, false, isVoid(rtti)), timeout) + val result = actorRef !! (Invocation(joinPoint, false, isVoid(rtti)), timeout) if (result.isDefined) result.get else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]") } @@ -443,17 +443,17 @@ private[akka] sealed class ActiveObjectAspect { val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti] val oneWay_? = isOneWay(rtti) || isVoid(rtti) val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues) - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setMethod(rtti.getMethod.getName) .setTarget(target.getName) - .setUuid(actorId.uuid) + .setUuid(actorRef.uuid) .setTimeout(timeout) .setIsActor(false) .setIsOneWay(oneWay_?) .setIsEscaped(false) RemoteProtocolBuilder.setMessage(message, requestBuilder) - val id = actorId.actor.registerSupervisorAsRemoteActor + val id = actorRef.actor.registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) val remoteMessage = requestBuilder.build val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage, None) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 3d6332f729..76fcfa0f15 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -11,12 +11,10 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.Transaction.Global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol -import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, - RemoteActorProxy, RemoteProtocolBuilder, - RemoteRequestIdFactory} import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.{HashCode, Logging, UUID} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol +import se.scalablesolutions.akka.remote.{RemoteNode, RemoteServer, RemoteClient, RemoteActorProxy, RemoteProtocolBuilder, RemoteRequestProtocolIdFactory} import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.commitbarriers.CountDownCommitBarrier @@ -250,31 +248,32 @@ object Actor extends Logging { } /** - * The ActorID object can be used to create ActorID instances out of its binary + * The ActorRef object can be used to create ActorRef instances out of its binary * protobuf based representation. *
- *   val actorRef = ActorID.fromBinary(bytes)
+ *   val actorRef = ActorRef.fromBinary(bytes)
  *   actorRef ! message // send message to remote actor through its reference
  * 
* * @author Jonas Bonér */ -object ActorID { - def fromBinary(bytes: Array[Byte]): ActorID = { - val actorRefProto = RemoteProtocol.ActorRef.newBuilder.mergeFrom(bytes).build +object ActorRef { + def fromBinary(bytes: Array[Byte]): ActorRef = + fromProtocol(RemoteProtocol.ActorRefProtocol.newBuilder.mergeFrom(bytes).build) + + def fromProtocol(protocol: RemoteProtocol.ActorRefProtocol): ActorRef = RemoteActorProxy( - actorRefProto.getUuid, - actorRefProto.getActorClassName, - actorRefProto.getSourceHostname, - actorRefProto.getSourcePort, - actorRefProto.getTimeout) - } + protocol.getUuid, + protocol.getActorClassName, + protocol.getSourceHostname, + protocol.getSourcePort, + protocol.getTimeout) } /** - * ActorID is an immutable and serializable handle to an Actor. + * ActorRef is an immutable and serializable handle to an Actor. *

- * Create an ActorID for an Actor by using the factory method on the Actor object. + * Create an ActorRef for an Actor by using the factory method on the Actor object. *

* Here is an example on how to create an actor with a default constructor. *

@@ -297,6 +296,7 @@ object ActorID {
  * 
  * @author Jonas Bonér
  */
+final class ActorRef private[akka] () {
   private[akka] var actorFactory: Either[Option[Class[_ <: Actor]], Option[() => Actor]] = Left(None)
 
   private[akka] lazy val actor: Actor = {
@@ -329,20 +329,29 @@ object ActorID {
     actorFactory = Right(Some(factory))
   }
   
-  def toBinary: Array[Byte] = {
+  def toProtocol: RemoteProtocol.ActorRefProtocol = {
+    val (host, port) = actor._replyToAddress.map(address => 
+      (address.getHostName, address.getPort))
+      .getOrElse((Actor.HOSTNAME, Actor.PORT))
+
     if (!actor._registeredInRemoteNodeDuringSerialization) { 
-      RemoteNode.register(uuid, this)
+      Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
+      if (RemoteServer.serverFor(host, port).isEmpty) (new RemoteServer).start(host, port)
+      RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(uuid, this)
       actor._registeredInRemoteNodeDuringSerialization = true
     }
-    RemoteProtocol.ActorRef.newBuilder
+    
+    RemoteProtocol.ActorRefProtocol.newBuilder
       .setUuid(uuid)
       .setActorClassName(actorClass.getName)
-      .setSourceHostname(RemoteServer.HOSTNAME)
-      .setSourcePort(RemoteServer.PORT)
+      .setSourceHostname(host)
+      .setSourcePort(port)
       .setTimeout(timeout)
-      .build.toByteArray
+      .build
   }
   
+  def toBinary: Array[Byte] = toProtocol.toByteArray
+  
   /**
    * Returns the class for the Actor instance that is managed by the ActorRef.
    */
@@ -450,7 +459,7 @@ object ActorID {
    * If you are sending messages using !! then you have to use reply(..)
    * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
    */
-  def !![T](message: Any): Option[T] = !![T](message, actor.timeout)
+  def !![T](message: Any)(implicit sender: Option[ActorRef] = None): Option[T] = !![T](message, actor.timeout)
 
   /**
    * Sends a message asynchronously returns a future holding the eventual reply message.
@@ -478,7 +487,7 @@ object ActorID {
     if (actor.isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
     if (actor.isRunning) {
       sender.get.actor.replyTo match {
-        case Some(Left(actorID)) => actor.postMessageToMailbox(message, Some(actorID))
+        case Some(Left(actorRef)) => actor.postMessageToMailbox(message, Some(actorRef))
         case Some(Right(future)) => actor.postMessageToMailboxAndCreateFutureResultWithTimeout(message, actor.timeout, Some(future))
         case _                   => throw new IllegalStateException("Can't forward message when initial sender is not an actor")
       }
@@ -599,7 +608,7 @@ trait Actor extends TransactionManagement with Logging {
   @volatile private[this] var _isSuspended = true
   @volatile private[this] var _isShutDown = false
   @volatile private[akka] var _isKilled = false
-  @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = true
+  @volatile private[akka] var _registeredInRemoteNodeDuringSerialization = false
   private var _hotswap: Option[PartialFunction[Any, Unit]] = None
   private[akka] var _remoteAddress: Option[InetSocketAddress] = None
   private[akka] var _linkedActors: Option[HashSet[ActorRef]] = None
@@ -904,7 +913,8 @@ trait Actor extends TransactionManagement with Logging {
 
 
   /**
-   * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
+   * Set the contact address for this actor. This is used for replying to messages sent 
+   * asynchronously when no reply channel exists.
    */
   def setReplyToAddress(hostname: String, port: Int): Unit = setReplyToAddress(new InetSocketAddress(hostname, port))
 
@@ -934,12 +944,12 @@ trait Actor extends TransactionManagement with Logging {
    * 

* To be invoked from within the actor itself. */ - protected[this] def link(actorId: ActorRef) = { - if (actorId.supervisor.isDefined) throw new IllegalStateException( - "Actor can only have one supervisor [" + actorId + "], e.g. link(actor) fails") - getLinkedActors.add(actorId) - actorId.supervisor = Some(self) - Actor.log.debug("Linking actor [%s] to actor [%s]", actorId, this) + protected[this] def link(actorRef: ActorRef) = { + if (actorRef.supervisor.isDefined) throw new IllegalStateException( + "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") + getLinkedActors.add(actorRef) + actorRef.supervisor = Some(self) + Actor.log.debug("Linking actor [%s] to actor [%s]", actorRef, this) } /** @@ -947,12 +957,12 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def unlink(actorId: ActorRef) = { - if (!getLinkedActors.contains(actorId)) throw new IllegalStateException( - "Actor [" + actorId + "] is not a linked actor, can't unlink") - getLinkedActors.remove(actorId) - actorId.supervisor = None - Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorId, this) + protected[this] def unlink(actorRef: ActorRef) = { + if (!getLinkedActors.contains(actorRef)) throw new IllegalStateException( + "Actor [" + actorRef + "] is not a linked actor, can't unlink") + getLinkedActors.remove(actorRef) + actorRef.supervisor = None + Actor.log.debug("Unlinking actor [%s] from actor [%s]", actorRef, this) } /** @@ -960,11 +970,11 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def startLink(actorId: ActorRef) = { + protected[this] def startLink(actorRef: ActorRef) = { try { - actorId.start + actorRef.start } finally { - link(actorId) + link(actorRef) } } @@ -973,12 +983,12 @@ trait Actor extends TransactionManagement with Logging { *

* To be invoked from within the actor itself. */ - protected[this] def startLinkRemote(actorId: ActorRef, hostname: String, port: Int) = { + protected[this] def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = { try { - actorId.makeRemote(hostname, port) - actorId.start + actorRef.makeRemote(hostname, port) + actorRef.start } finally { - link(actorId) + link(actorRef) } } @@ -988,9 +998,9 @@ trait Actor extends TransactionManagement with Logging { * To be invoked from within the actor itself. */ protected[this] def spawn[T <: Actor : Manifest]: ActorRef = { - val actorId = spawnButDoNotStart[T] - actorId.start - actorId + val actorRef = spawnButDoNotStart[T] + actorRef.start + actorRef } /** @@ -1062,15 +1072,15 @@ trait Actor extends TransactionManagement with Logging { new ActorRef(() => actor) } - protected[akka] def postMessageToMailbox(message: Any, sender: Option[ActorRef]): Unit = { + protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { joinTransaction(message) if (_remoteAddress.isDefined) { - val requestBuilder = RemoteProtocol.RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(this.timeout) - .setUuid(this.id) + .setUuid(this.uuid) .setIsActor(true) .setIsOneWay(true) .setIsEscaped(false) @@ -1078,30 +1088,12 @@ trait Actor extends TransactionManagement with Logging { val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) - // set the source fields used to reply back to the original sender - // (i.e. not the remote proxy actor) - if (sender.isDefined) { - val s = sender.get.actor - requestBuilder.setSourceTarget(s.getClass.getName) - requestBuilder.setSourceUuid(s.uuid) + senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) - val (host, port) = s._replyToAddress.map(a => (a.getHostName, a.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) - - Actor.log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port) - - requestBuilder.setSourceHostname(host) - requestBuilder.setSourcePort(port) - - if (RemoteServer.serverFor(host, port).isEmpty) { - val server = new RemoteServer - server.start(host, port) - } - RemoteServer.actorsFor(RemoteServer.Address(host, port)).actors.put(sender.get.id, sender.get) - } RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(_remoteAddress.get).send[Any](requestBuilder.build, None) } else { - val invocation = new MessageInvocation(self, message, sender.map(Left(_)), transactionSet.get) + val invocation = new MessageInvocation(self, message, senderOption.map(Left(_)), transactionSet.get) if (messageDispatcher.usesActorMailbox) { _mailbox.add(invocation) if (_isSuspended) invocation.send @@ -1117,17 +1109,21 @@ trait Actor extends TransactionManagement with Logging { joinTransaction(message) if (_remoteAddress.isDefined) { - val requestBuilder = RemoteProtocol.RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteProtocol.RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(this.getClass.getName) .setTimeout(this.timeout) - .setUuid(this.id) + .setUuid(this.uuid) .setIsActor(true) .setIsOneWay(false) .setIsEscaped(false) + + //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) RemoteProtocolBuilder.setMessage(message, requestBuilder) + val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) + val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture) if (future.isDefined) future.get else throw new IllegalStateException("Expected a future from remote call to actor " + toString) @@ -1136,10 +1132,7 @@ trait Actor extends TransactionManagement with Logging { else new DefaultCompletableFuture[T](timeout) val invocation = new MessageInvocation( self, message, Some(Right(future.asInstanceOf[CompletableFuture[Any]])), transactionSet.get) - - if (messageDispatcher.usesActorMailbox) - _mailbox.add(invocation) - + if (messageDispatcher.usesActorMailbox) _mailbox.add(invocation) invocation.send future } @@ -1273,8 +1266,8 @@ trait Actor extends TransactionManagement with Logging { private[this] def restartLinkedActors(reason: Throwable) = { getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { - actorId => - val actor = actorId.actor + actorRef => + val actor = actorRef.actor if (actor.lifeCycle.isEmpty) actor.lifeCycle = Some(LifeCycle(Permanent)) actor.lifeCycle.get match { case LifeCycle(scope, _) => { @@ -1284,7 +1277,7 @@ trait Actor extends TransactionManagement with Logging { case Temporary => Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", actor.id) actor.stop - getLinkedActors.remove(actorId) // remove the temporary actor + getLinkedActors.remove(actorRef) // remove the temporary actor // if last temporary actor is gone, then unlink me from supervisor if (getLinkedActors.isEmpty) { Actor.log.info("All linked actors have died permanently (they were all configured as TEMPORARY)" + @@ -1370,7 +1363,7 @@ object DispatcherType { * * @author Jonas Bonér */ -class ActorMessageInvoker private[akka] (val actorId: ActorRef) extends MessageInvoker { - def invoke(handle: MessageInvocation) = actorId.actor.invoke(handle) +class ActorMessageInvoker private[akka] (val actorRef: ActorRef) extends MessageInvoker { + def invoke(handle: MessageInvocation) = actorRef.actor.invoke(handle) } diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index c72c588937..c52c59b2ae 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -57,9 +57,9 @@ object ActorRegistry extends Logging { val all = new ListBuffer[ActorRef] val elements = actorsByUUID.elements while (elements.hasMoreElements) { - val actorId = elements.nextElement - if (manifest.erasure.isAssignableFrom(actorId.actor.getClass)) { - all += actorId + val actorRef = elements.nextElement + if (manifest.erasure.isAssignableFrom(actorRef.actor.getClass)) { + all += actorRef } } all.toList @@ -92,24 +92,24 @@ object ActorRegistry extends Logging { /** * Registers an actor in the ActorRegistry. */ - def register(actorId: ActorRef) = { + def register(actorRef: ActorRef) = { // UUID - actorsByUUID.put(actorId.uuid, actorId) + actorsByUUID.put(actorRef.uuid, actorRef) // ID - val id = actorId.id - if (id eq null) throw new IllegalStateException("Actor.id is null " + actorId) - if (actorsById.containsKey(id)) actorsById.put(id, actorId :: actorsById.get(id)) - else actorsById.put(id, actorId :: Nil) + val id = actorRef.id + if (id eq null) throw new IllegalStateException("Actor.id is null " + actorRef) + if (actorsById.containsKey(id)) actorsById.put(id, actorRef :: actorsById.get(id)) + else actorsById.put(id, actorRef :: Nil) // Class name - val className = actorId.actor.getClass.getName + val className = actorRef.actor.getClass.getName if (actorsByClassName.containsKey(className)) { - actorsByClassName.put(className, actorId :: actorsByClassName.get(className)) - } else actorsByClassName.put(className, actorId :: Nil) + actorsByClassName.put(className, actorRef :: actorsByClassName.get(className)) + } else actorsByClassName.put(className, actorRef :: Nil) // notify listeners - foreachListener(_ ! ActorRegistered(actorId)) + foreachListener(_ ! ActorRegistered(actorRef)) } /** diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index 905cd6c8fe..2b31bea60c 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -58,9 +58,9 @@ object Scheduler extends Actor { def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - def stopSupervising(actorId: ActorRef) = { - unlink(actorId) - schedulers.remove(actorId) + def stopSupervising(actorRef: ActorRef) = { + unlink(actorRef) + schedulers.remove(actorRef) } override def shutdown = { diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index f4b7b2b012..c0023ae44b 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -103,9 +103,9 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep override def stop = synchronized { super[Actor].stop - getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorId => - actorId.stop - log.info("Shutting actor down: %s", actorId) + getLinkedActors.toArray.toList.asInstanceOf[List[ActorRef]].foreach { actorRef => + actorRef.stop + log.info("Shutting actor down: %s", actorRef) } log.info("Stopping supervisor: %s", this) } @@ -119,19 +119,19 @@ sealed class Supervisor private[akka] (handler: FaultHandlingStrategy, trapExcep case SupervisorConfig(_, servers) => servers.map(server => server match { - case Supervise(actorId, lifeCycle, remoteAddress) => - val className = actorId.actor.getClass.getName + case Supervise(actorRef, lifeCycle, remoteAddress) => + val className = actorRef.actor.getClass.getName val currentActors = { val list = actors.get(className) if (list eq null) List[ActorRef]() else list } - actors.put(className, actorId :: currentActors) - actorId.actor.lifeCycle = Some(lifeCycle) - startLink(actorId) + actors.put(className, actorRef :: currentActors) + actorRef.actor.lifeCycle = Some(lifeCycle) + startLink(actorRef) remoteAddress.foreach(address => RemoteServer.actorsFor( RemoteServer.Address(address.hostname, address.port)) - .actors.put(actorId.id, actorId)) + .actors.put(actorRef.id, actorRef)) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val supervisor = { diff --git a/akka-core/src/main/scala/config/SupervisionConfig.scala b/akka-core/src/main/scala/config/SupervisionConfig.scala index 7e1daa5935..7f1cd308b3 100644 --- a/akka-core/src/main/scala/config/SupervisionConfig.scala +++ b/akka-core/src/main/scala/config/SupervisionConfig.scala @@ -25,13 +25,13 @@ object ScalaConfig { case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server - class Supervise(val actorId: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { + class Supervise(val actorRef: ActorRef, val lifeCycle: LifeCycle, _remoteAddress: RemoteAddress) extends Server { val remoteAddress: Option[RemoteAddress] = if (_remoteAddress eq null) None else Some(_remoteAddress) } object Supervise { - def apply(actorId: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorId, lifeCycle, remoteAddress) - def apply(actorId: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorId, lifeCycle, null) - def unapply(supervise: Supervise) = Some((supervise.actorId, supervise.lifeCycle, supervise.remoteAddress)) + def apply(actorRef: ActorRef, lifeCycle: LifeCycle, remoteAddress: RemoteAddress) = new Supervise(actorRef, lifeCycle, remoteAddress) + def apply(actorRef: ActorRef, lifeCycle: LifeCycle) = new Supervise(actorRef, lifeCycle, null) + def unapply(supervise: Supervise) = Some((supervise.actorRef, supervise.lifeCycle, supervise.remoteAddress)) } case class RestartStrategy( @@ -227,8 +227,8 @@ object JavaConfig { intf, target, lifeCycle.transform, timeout, transactionRequired, dispatcher, if (remoteAddress ne null) se.scalablesolutions.akka.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null) - def newSupervised(actorId: ActorRef) = - se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorId, lifeCycle.transform) + def newSupervised(actorRef: ActorRef) = + se.scalablesolutions.akka.config.ScalaConfig.Supervise(actorRef, lifeCycle.transform) } } \ No newline at end of file diff --git a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala index f5fd490df9..8d87272ef0 100644 --- a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala @@ -18,14 +18,14 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten def dispatch(invocation: MessageInvocation) = queue.append(invocation) - override def register(actorId: ActorRef) = synchronized { - messageInvokers.put(actorId, new ActorMessageInvoker(actorId)) - super.register(actorId) + override def register(actorRef: ActorRef) = synchronized { + messageInvokers.put(actorRef, new ActorMessageInvoker(actorRef)) + super.register(actorRef) } - override def unregister(actorId: ActorRef) = synchronized { - messageInvokers.remove(actorId) - super.unregister(actorId) + override def unregister(actorRef: ActorRef) = synchronized { + messageInvokers.remove(actorRef) + super.unregister(actorRef) } def shutdown = if (active) { diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 23adfed05f..4edf6651c0 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -31,7 +31,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef} class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends MessageDispatcher with ThreadPoolBuilder { @volatile private var active: Boolean = false - implicit def actorId2actor(actorId: ActorRef): Actor = actorId.actor + implicit def actorRef2actor(actorRef: ActorRef): Actor = actorRef.actor /** Type of the actors registered in this dispatcher. */ private var actorType:Option[Class[_]] = None @@ -193,15 +193,15 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool - override def register(actorId: ActorRef) = { - verifyActorsAreOfSameType(actorId) - pooledActors.add(actorId) - super.register(actorId) + override def register(actorRef: ActorRef) = { + verifyActorsAreOfSameType(actorRef) + pooledActors.add(actorRef) + super.register(actorRef) } - override def unregister(actorId: ActorRef) = { - pooledActors.remove(actorId) - super.unregister(actorId) + override def unregister(actorRef: ActorRef) = { + pooledActors.remove(actorRef) + super.unregister(actorRef) } def usesActorMailbox = true diff --git a/akka-core/src/main/scala/dispatch/MessageHandling.scala b/akka-core/src/main/scala/dispatch/MessageHandling.scala index 9d5c049495..6c9fc0f842 100644 --- a/akka-core/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-core/src/main/scala/dispatch/MessageHandling.scala @@ -60,9 +60,9 @@ trait MessageDispatcher extends Logging { def dispatch(invocation: MessageInvocation) def start def shutdown - def register(actorId: ActorRef) = references.put(actorId.uuid, actorId) - def unregister(actorId: ActorRef) = { - references.remove(actorId.uuid) + def register(actorRef: ActorRef) = references.put(actorRef.uuid, actorRef) + def unregister(actorRef: ActorRef) = { + references.remove(actorRef.uuid) if (canBeShutDown) shutdown // shut down in the dispatcher's references is zero } diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index f977260cf2..38e068b9a1 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -4,7 +4,7 @@ package se.scalablesolutions.akka.remote -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol} import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import se.scalablesolutions.akka.util.{UUID, Logging} @@ -31,7 +31,7 @@ import scala.collection.mutable.{HashSet, HashMap} * * @author Jonas Bonér */ -object RemoteRequestIdFactory { +object RemoteRequestProtocolIdFactory { private val nodeId = UUID.newUuid private val id = new AtomicLong @@ -64,23 +64,15 @@ private[akka] class RemoteActorProxy private ( val remoteClient = RemoteClient.clientFor(hostname, port) override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(className) .setTimeout(timeOut) .setUuid(uuid) .setIsActor(true) .setIsOneWay(true) .setIsEscaped(false) - if (senderOption.isDefined) { - val sender = senderOption.get.actor - requestBuilder.setSourceTarget(sender.getClass.getName) - requestBuilder.setSourceUuid(sender.uuid) - val (host, port) = sender._replyToAddress.map(address => - (address.getHostName, address.getPort)).getOrElse((Actor.HOSTNAME, Actor.PORT)) - requestBuilder.setSourceHostname(host) - requestBuilder.setSourcePort(port) - } + senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) RemoteProtocolBuilder.setMessage(message, requestBuilder) remoteClient.send[Any](requestBuilder.build, None) } @@ -89,14 +81,15 @@ private[akka] class RemoteActorProxy private ( message: Any, timeout: Long, senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val requestBuilder = RemoteRequest.newBuilder - .setId(RemoteRequestIdFactory.nextId) + val requestBuilder = RemoteRequestProtocol.newBuilder + .setId(RemoteRequestProtocolIdFactory.nextId) .setTarget(className) .setTimeout(timeout) .setUuid(uuid) .setIsActor(true) .setIsOneWay(false) .setIsEscaped(false) + //senderOption.foreach(sender => requestBuilder.setSender(sender.toProtocol)) RemoteProtocolBuilder.setMessage(message, requestBuilder) val future = remoteClient.send(requestBuilder.build, senderFuture) if (future.isDefined) future.get @@ -121,14 +114,14 @@ object RemoteClient extends Logging { def actorFor(className: String, hostname: String, port: Int): ActorRef = actorFor(className, className, 5000L, hostname, port) - def actorFor(actorId: String, className: String, hostname: String, port: Int): ActorRef = - actorFor(actorId, className, 5000L, hostname, port) + def actorFor(actorRef: String, className: String, hostname: String, port: Int): ActorRef = + actorFor(actorRef, className, 5000L, hostname, port) def actorFor(className: String, timeout: Long, hostname: String, port: Int): ActorRef = actorFor(className, className, timeout, hostname, port) - def actorFor(actorId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = - RemoteActorProxy(actorId, className, hostname, port, timeout) + def actorFor(actorRef: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef = + RemoteActorProxy(actorRef, className, hostname, port, timeout) def clientFor(hostname: String, port: Int): RemoteClient = clientFor(new InetSocketAddress(hostname, port)) @@ -237,7 +230,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { } } - def send[T](request: RemoteRequest, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) { + def send[T](request: RemoteRequestProtocol, senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = if (isRunning) { if (request.getIsOneWay) { connection.getChannel.write(request) None @@ -256,17 +249,17 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging { throw exception } - def registerSupervisorForActor(actorId: ActorRef) = - if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorId + " since it is not under supervision") - else supervisors.putIfAbsent(actorId.supervisor.get.uuid, actorId) + def registerSupervisorForActor(actorRef: ActorRef) = + if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't register supervisor for " + actorRef + " since it is not under supervision") + else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef) - def deregisterSupervisorForActor(actorId: ActorRef) = - if (!actorId.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorId + " since it is not under supervision") - else supervisors.remove(actorId.supervisor.get.uuid) + def deregisterSupervisorForActor(actorRef: ActorRef) = + if (!actorRef.supervisor.isDefined) throw new IllegalStateException("Can't unregister supervisor for " + actorRef + " since it is not under supervision") + else supervisors.remove(actorRef.supervisor.get.uuid) - def registerListener(actorId: ActorRef) = listeners.add(actorId) + def registerListener(actorRef: ActorRef) = listeners.add(actorRef) - def deregisterListener(actorId: ActorRef) = listeners.remove(actorId) + def deregisterListener(actorRef: ActorRef) = listeners.remove(actorRef) } /** @@ -283,7 +276,7 @@ class RemoteClientPipelineFactory(name: String, val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT) val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) - val protobufDec = new ProtobufDecoder(RemoteReply.getDefaultInstance) + val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder val zipCodec = RemoteServer.COMPRESSION_SCHEME match { case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) @@ -323,9 +316,9 @@ class RemoteClientHandler(val name: String, override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { try { val result = event.getMessage - if (result.isInstanceOf[RemoteReply]) { - val reply = result.asInstanceOf[RemoteReply] - log.debug("Remote client received RemoteReply[\n%s]", reply.toString) + if (result.isInstanceOf[RemoteReplyProtocol]) { + val reply = result.asInstanceOf[RemoteReplyProtocol] + log.debug("Remote client received RemoteReplyProtocol[\n%s]", reply.toString) val future = futures.get(reply.getId).asInstanceOf[CompletableFuture[Any]] if (reply.getIsSuccessful) { val message = RemoteProtocolBuilder.getMessage(reply) @@ -388,7 +381,7 @@ class RemoteClientHandler(val name: String, event.getChannel.close } - private def parseException(reply: RemoteReply) = { + private def parseException(reply: RemoteReplyProtocol) = { val exception = reply.getException val exceptionType = Class.forName(exception.substring(0, exception.indexOf('$'))) val exceptionMessage = exception.substring(exception.indexOf('$') + 1, exception.length) diff --git a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala index b95ac210f5..b9c446d3d3 100644 --- a/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala +++ b/akka-core/src/main/scala/remote/RemoteProtocolBuilder.scala @@ -6,7 +6,7 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.serialization.Serializable.SBinary import se.scalablesolutions.akka.serialization.{Serializer, Serializable, SerializationProtocol} -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequest, RemoteReply} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteRequestProtocol, RemoteReplyProtocol} import com.google.protobuf.{Message, ByteString} @@ -23,7 +23,7 @@ object RemoteProtocolBuilder { SERIALIZER_SCALA_JSON.classLoader = Some(cl) } - def getMessage(request: RemoteRequest): Any = { + def getMessage(request: RemoteRequestProtocol): Any = { request.getProtocol match { case SerializationProtocol.JAVA => unbox(SERIALIZER_JAVA.in(request.getMessage.toByteArray, None)) @@ -42,7 +42,7 @@ object RemoteProtocolBuilder { } } - def getMessage(reply: RemoteReply): Any = { + def getMessage(reply: RemoteReplyProtocol): Any = { reply.getProtocol match { case SerializationProtocol.JAVA => unbox(SERIALIZER_JAVA.in(reply.getMessage.toByteArray, None)) @@ -61,7 +61,7 @@ object RemoteProtocolBuilder { } } - def setMessage(message: Any, builder: RemoteRequest.Builder) = { + def setMessage(message: Any, builder: RemoteRequestProtocol.Builder) = { if (message.isInstanceOf[Serializable.SBinary[_]]) { val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]] builder.setProtocol(SerializationProtocol.SBINARY) @@ -89,7 +89,7 @@ object RemoteProtocolBuilder { } } - def setMessage(message: Any, builder: RemoteReply.Builder) = { + def setMessage(message: Any, builder: RemoteReplyProtocol.Builder) = { if (message.isInstanceOf[Serializable.SBinary[_]]) { val serializable = message.asInstanceOf[Serializable.SBinary[_ <: Any]] builder.setProtocol(SerializationProtocol.SBINARY) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index bfe7e6355a..6703d677c0 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -11,7 +11,7 @@ import java.util.{Map => JMap} import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.util._ -import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.{RemoteReply, RemoteRequest} +import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol._ import se.scalablesolutions.akka.config.Config.config import org.jboss.netty.bootstrap.ServerBootstrap @@ -199,37 +199,54 @@ class RemoteServer extends Logging { /** * Register Remote Actor by the Actor's 'id' field. */ - def register(actor: ActorRef) = synchronized { + def register(actorRef: ActorRef) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id) - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actor.id, actor) + log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id) + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(actorRef.id, actorRef) } } /** * Register Remote Actor by a specific 'id' passed as argument. + *

+ * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ - def register(id: String, actor: ActorRef) = synchronized { + def register(id: String, actorRef: ActorRef) = synchronized { if (_isRunning) { - log.info("Registering server side remote actor [%s] with id [%s]", actor.actorClass.getName, id) - RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actor) + log.info("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) + RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors.put(id, actorRef) } } /** - * Unregister Remote Actor. + * Unregister Remote Actor that is registered using its 'id' field (not custom ID). */ - def unregister(actor: ActorID) = synchronized { + def unregister(actorRef: ActorRef) = synchronized { if (_isRunning) { - log.info("Unregistering server side remote actor [%s] with id [%s]", actor.actorClass.getName, actor.id) + log.info("Unregistering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, actorRef.id) val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) - server.actors.put(actor.id, actor) - if (actor.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actor.uuid) + server.actors.remove(actorRef.id) + if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) + } + } + + /** + * Unregister Remote Actor by specific 'id'. + *

+ * NOTE: You need to call this method if you have registered an actor by a custom ID. + */ + def unregister(id: String) = synchronized { + if (_isRunning) { + log.info("Unregistering server side remote actor with id [%s]", id) + val server = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)) + val actorRef = server.actors.get(id) + server.actors.remove(id) + if (actorRef.actor._registeredInRemoteNodeDuringSerialization) server.actors.remove(actorRef.uuid) } } } -case class Codec(encoder : ChannelHandler, decoder : ChannelHandler) +case class Codec(encoder: ChannelHandler, decoder: ChannelHandler) /** * @author Jonas Bonér @@ -245,7 +262,7 @@ class RemoteServerPipelineFactory( def getPipeline: ChannelPipeline = { val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) - val protobufDec = new ProtobufDecoder(RemoteRequest.getDefaultInstance) + val protobufDec = new ProtobufDecoder(RemoteRequestProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder val zipCodec = RemoteServer.COMPRESSION_SCHEME match { case "zlib" => Some(Codec(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL), new ZlibDecoder)) @@ -295,50 +312,37 @@ class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { val message = event.getMessage if (message eq null) throw new IllegalStateException("Message in remote MessageEvent is null: " + event) - if (message.isInstanceOf[RemoteRequest]) { - handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel) + if (message.isInstanceOf[RemoteRequestProtocol]) { + handleRemoteRequestProtocol(message.asInstanceOf[RemoteRequestProtocol], event.getChannel) } } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + event.getCause.printStackTrace log.error(event.getCause, "Unexpected exception from remote downstream") event.getChannel.close } - private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = { - log.debug("Received RemoteRequest[\n%s]", request.toString) + private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { + log.debug("Received RemoteRequestProtocol[\n%s]", request.toString) if (request.getIsActor) dispatchToActor(request, channel) else dispatchToActiveObject(request, channel) } - private def dispatchToActor(request: RemoteRequest, channel: Channel) = { - log.debug("Dispatching to remote actor [%s]", request.getTarget) - val actorId = createActor(request.getTarget, request.getUuid, request.getTimeout) - actorId.start - + private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = { + log.debug("Dispatching to remote actor [%s:%s]", request.getTarget, request.getUuid) + val actorRef = createActor(request.getTarget, request.getUuid, request.getTimeout) + actorRef.start val message = RemoteProtocolBuilder.getMessage(request) if (request.getIsOneWay) { - if (request.hasSourceHostname && request.hasSourcePort) { - // re-create the sending actor - val targetClass = if (request.hasSourceTarget) request.getSourceTarget - else request.getTarget - - val remoteActorId = createActor(targetClass, request.getSourceUuid, request.getTimeout) - if (!remoteActorId.isRunning) { - remoteActorId.makeRemote(request.getSourceHostname, request.getSourcePort) - remoteActorId.start - } - actorId.!(message)(Some(remoteActorId)) - } else { - // couldn't find a way to reply, send the message without a source/sender - actorId ! message - } + val sender = request.getSender + if (sender ne null) actorRef.!(message)(Some(ActorRef.fromProtocol(sender))) } else { try { - val resultOrNone = actorId !! message + val resultOrNone = actorRef !! message val result: AnyRef = if (resultOrNone.isDefined) resultOrNone.get else null log.debug("Returning result from actor invocation [%s]", result) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setIsSuccessful(true) .setIsActor(true) @@ -349,7 +353,7 @@ class RemoteServerHandler( } catch { case e: Throwable => log.error(e, "Could not invoke remote actor [%s]", request.getTarget) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setException(e.getClass.getName + "$" + e.getMessage) .setIsSuccessful(false) @@ -361,7 +365,7 @@ class RemoteServerHandler( } } - private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = { + private def dispatchToActiveObject(request: RemoteRequestProtocol, channel: Channel) = { log.debug("Dispatching to remote active object [%s :: %s]", request.getMethod, request.getTarget) val activeObject = createActiveObject(request.getTarget, request.getTimeout) @@ -377,7 +381,7 @@ class RemoteServerHandler( else { val result = messageReceiver.invoke(activeObject, unescapedArgs: _*) log.debug("Returning result from remote active object invocation [%s]", result) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setIsSuccessful(true) .setIsActor(false) @@ -389,7 +393,7 @@ class RemoteServerHandler( } catch { case e: InvocationTargetException => log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage) .setIsSuccessful(false) @@ -399,7 +403,7 @@ class RemoteServerHandler( channel.write(replyMessage) case e: Throwable => log.error(e.getCause, "Could not invoke remote active object [%s :: %s]", request.getMethod, request.getTarget) - val replyBuilder = RemoteReply.newBuilder + val replyBuilder = RemoteReplyProtocol.newBuilder .setId(request.getId) .setException(e.getClass.getName + "$" + e.getMessage) .setIsSuccessful(false) @@ -454,8 +458,9 @@ class RemoteServerHandler( * Does not start the actor. */ private def createActor(name: String, uuid: String, timeout: Long): ActorRef = { - val actorIdOrNull = actors.get(uuid) - if (actorIdOrNull eq null) { + val actorRefOrNull = actors.get(uuid) + println("----------- ACTOR " + actorRefOrNull + " " + uuid) + if (actorRefOrNull eq null) { try { log.info("Creating a new remote actor [%s:%s]", name, uuid) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) @@ -464,14 +469,14 @@ class RemoteServerHandler( newInstance._uuid = uuid newInstance.timeout = timeout newInstance._remoteAddress = None - val actorId = new ActorRef(() => newInstance) - actors.put(uuid, actorId) - actorId + val actorRef = new ActorRef(() => newInstance) + actors.put(uuid, actorRef) + actorRef } catch { case e => log.error(e, "Could not create remote actor instance") throw e } - } else actorIdOrNull + } else actorRefOrNull } } diff --git a/akka-core/src/test/scala/FutureSpec.scala b/akka-core/src/test/scala/FutureSpec.scala index 59822d9ea7..4f9da6572f 100644 --- a/akka-core/src/test/scala/FutureSpec.scala +++ b/akka-core/src/test/scala/FutureSpec.scala @@ -20,7 +20,7 @@ object FutureSpec { class FutureSpec extends JUnitSuite { import FutureSpec._ - @Test def shouldActorReplyResultThroughExplicitFuture = { + @Test def shouldActorReplyResultThroughExplicitFuture { val actor = newActor[TestActor] actor.start val future = actor !!! "Hello" @@ -30,7 +30,7 @@ class FutureSpec extends JUnitSuite { actor.stop } - @Test def shouldActorReplyExceptionThroughExplicitFuture = { + @Test def shouldActorReplyExceptionThroughExplicitFuture { val actor = newActor[TestActor] actor.start val future = actor !!! "Failure" diff --git a/akka-core/src/test/scala/MemoryFootprintSpec.scala b/akka-core/src/test/scala/MemoryFootprintSpec.scala deleted file mode 100644 index 9efb8270cb..0000000000 --- a/akka-core/src/test/scala/MemoryFootprintSpec.scala +++ /dev/null @@ -1,36 +0,0 @@ -package se.scalablesolutions.akka.actor - -import org.scalatest.junit.JUnitSuite -import org.junit.Test -import Actor._ - -class MemoryFootprintSpec extends JUnitSuite { - class Mem extends Actor { - def receive = { - case _ => {} - } - } - - val NR_OF_ACTORS = 100000 - val MAX_MEMORY_FOOTPRINT_PER_ACTOR = 700 - - @Test - def actorsShouldHaveLessMemoryFootprintThan700Bytes = { - println("============== MEMORY FOOTPRINT TEST ==============") - // warm up - (1 until 10000).foreach(i => new Mem) - - // Actors are put in AspectRegistry when created so they won't be GCd here - - val totalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory - println("Memory before " + totalMem) - (1 until NR_OF_ACTORS).foreach(i => new Mem) - - val newTotalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory - println("Memory aftor " + newTotalMem) - val memPerActor = (newTotalMem - totalMem) / NR_OF_ACTORS - - println("Memory footprint per actor is : " + memPerActor) - assert(memPerActor < MAX_MEMORY_FOOTPRINT_PER_ACTOR) // memory per actor should be less than 630 bytes - } -} diff --git a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala index 6ecefca1e3..51d1e0876f 100644 --- a/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala +++ b/akka-core/src/test/scala/ServerInitiatedRemoteActorSpec.scala @@ -105,7 +105,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } @Test - def shouldSendRemoteReply = { + def shouldSendRemoteReplyProtocol = { implicit val timeout = 500000000L val actor = RemoteClient.actorFor( "se.scalablesolutions.akka.actor.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorBidirectional", diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java index 724978ef12..69a50a4bd2 100644 --- a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.java @@ -7,28 +7,28 @@ public final class RemoteProtocol { public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { } - public static final class ActorRef extends + public static final class ActorRefProtocol extends com.google.protobuf.GeneratedMessage { - // Use ActorRef.newBuilder() to construct. - private ActorRef() {} + // Use ActorRefProtocol.newBuilder() to construct. + private ActorRefProtocol() {} - private static final ActorRef defaultInstance = new ActorRef(); - public static ActorRef getDefaultInstance() { + private static final ActorRefProtocol defaultInstance = new ActorRefProtocol(); + public static ActorRefProtocol getDefaultInstance() { return defaultInstance; } - public ActorRef getDefaultInstanceForType() { + public ActorRefProtocol getDefaultInstanceForType() { return defaultInstance; } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor; } protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_fieldAccessorTable; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_fieldAccessorTable; } // required string uuid = 1; @@ -126,57 +126,57 @@ public final class RemoteProtocol { return size; } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom(byte[] data) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseDelimitedFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseDelimitedFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -186,25 +186,25 @@ public final class RemoteProtocol { public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef prototype) { + public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder { - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef result; + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol result; - // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.newBuilder() + // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.newBuilder() private Builder() {} private static Builder create() { Builder builder = new Builder(); - builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef(); + builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol(); return builder; } - protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef internalGetResult() { + protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol internalGetResult() { return result; } @@ -213,7 +213,7 @@ public final class RemoteProtocol { throw new IllegalStateException( "Cannot call clear() after build()."); } - result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef(); + result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol(); return this; } @@ -223,24 +223,24 @@ public final class RemoteProtocol { public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.getDescriptor(); + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDescriptor(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef getDefaultInstanceForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.getDefaultInstance(); + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); } public boolean isInitialized() { return result.isInitialized(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef build() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol build() { if (result != null && !isInitialized()) { throw newUninitializedMessageException(result); } return buildPartial(); } - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef buildParsed() + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol buildParsed() throws com.google.protobuf.InvalidProtocolBufferException { if (!isInitialized()) { throw newUninitializedMessageException( @@ -249,27 +249,27 @@ public final class RemoteProtocol { return buildPartial(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef buildPartial() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol buildPartial() { if (result == null) { throw new IllegalStateException( "build() has already been called on this Builder."); } - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef returnMe = result; + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol returnMe = result; result = null; return returnMe; } public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef) { - return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef)other); + if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef other) { - if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.getDefaultInstance()) return this; + public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol other) { + if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) return this; if (other.hasUuid()) { setUuid(other.getUuid()); } @@ -444,28 +444,28 @@ public final class RemoteProtocol { } } - public static final class RemoteRequest extends + public static final class RemoteRequestProtocol extends com.google.protobuf.GeneratedMessage { - // Use RemoteRequest.newBuilder() to construct. - private RemoteRequest() {} + // Use RemoteRequestProtocol.newBuilder() to construct. + private RemoteRequestProtocol() {} - private static final RemoteRequest defaultInstance = new RemoteRequest(); - public static RemoteRequest getDefaultInstance() { + private static final RemoteRequestProtocol defaultInstance = new RemoteRequestProtocol(); + public static RemoteRequestProtocol getDefaultInstance() { return defaultInstance; } - public RemoteRequest getDefaultInstanceForType() { + public RemoteRequestProtocol getDefaultInstanceForType() { return defaultInstance; } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor; } protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_fieldAccessorTable; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_fieldAccessorTable; } // required uint64 id = 1; @@ -552,33 +552,12 @@ public final class RemoteProtocol { public boolean hasIsEscaped() { return hasIsEscaped; } public boolean getIsEscaped() { return isEscaped_; } - // optional string sourceHostname = 13; - public static final int SOURCEHOSTNAME_FIELD_NUMBER = 13; - private boolean hasSourceHostname; - private java.lang.String sourceHostname_ = ""; - public boolean hasSourceHostname() { return hasSourceHostname; } - public java.lang.String getSourceHostname() { return sourceHostname_; } - - // optional uint32 sourcePort = 14; - public static final int SOURCEPORT_FIELD_NUMBER = 14; - private boolean hasSourcePort; - private int sourcePort_ = 0; - public boolean hasSourcePort() { return hasSourcePort; } - public int getSourcePort() { return sourcePort_; } - - // optional string sourceTarget = 15; - public static final int SOURCETARGET_FIELD_NUMBER = 15; - private boolean hasSourceTarget; - private java.lang.String sourceTarget_ = ""; - public boolean hasSourceTarget() { return hasSourceTarget; } - public java.lang.String getSourceTarget() { return sourceTarget_; } - - // optional string sourceUuid = 16; - public static final int SOURCEUUID_FIELD_NUMBER = 16; - private boolean hasSourceUuid; - private java.lang.String sourceUuid_ = ""; - public boolean hasSourceUuid() { return hasSourceUuid; } - public java.lang.String getSourceUuid() { return sourceUuid_; } + // optional .se.scalablesolutions.akka.remote.protobuf.ActorRefProtocol sender = 13; + public static final int SENDER_FIELD_NUMBER = 13; + private boolean hasSender; + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol sender_ = se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); + public boolean hasSender() { return hasSender; } + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol getSender() { return sender_; } public final boolean isInitialized() { if (!hasId) return false; @@ -590,6 +569,9 @@ public final class RemoteProtocol { if (!hasIsActor) return false; if (!hasIsOneWay) return false; if (!hasIsEscaped) return false; + if (hasSender()) { + if (!getSender().isInitialized()) return false; + } return true; } @@ -631,17 +613,8 @@ public final class RemoteProtocol { if (hasIsEscaped()) { output.writeBool(12, getIsEscaped()); } - if (hasSourceHostname()) { - output.writeString(13, getSourceHostname()); - } - if (hasSourcePort()) { - output.writeUInt32(14, getSourcePort()); - } - if (hasSourceTarget()) { - output.writeString(15, getSourceTarget()); - } - if (hasSourceUuid()) { - output.writeString(16, getSourceUuid()); + if (hasSender()) { + output.writeMessage(13, getSender()); } getUnknownFields().writeTo(output); } @@ -700,78 +673,66 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeBoolSize(12, getIsEscaped()); } - if (hasSourceHostname()) { + if (hasSender()) { size += com.google.protobuf.CodedOutputStream - .computeStringSize(13, getSourceHostname()); - } - if (hasSourcePort()) { - size += com.google.protobuf.CodedOutputStream - .computeUInt32Size(14, getSourcePort()); - } - if (hasSourceTarget()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(15, getSourceTarget()); - } - if (hasSourceUuid()) { - size += com.google.protobuf.CodedOutputStream - .computeStringSize(16, getSourceUuid()); + .computeMessageSize(13, getSender()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom(byte[] data) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseDelimitedFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseDelimitedFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -781,25 +742,25 @@ public final class RemoteProtocol { public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest prototype) { + public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder { - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest result; + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol result; - // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.newBuilder() + // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.newBuilder() private Builder() {} private static Builder create() { Builder builder = new Builder(); - builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest(); + builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol(); return builder; } - protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest internalGetResult() { + protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol internalGetResult() { return result; } @@ -808,7 +769,7 @@ public final class RemoteProtocol { throw new IllegalStateException( "Cannot call clear() after build()."); } - result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest(); + result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol(); return this; } @@ -818,24 +779,24 @@ public final class RemoteProtocol { public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.getDescriptor(); + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.getDescriptor(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest getDefaultInstanceForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.getDefaultInstance(); + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.getDefaultInstance(); } public boolean isInitialized() { return result.isInitialized(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest build() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol build() { if (result != null && !isInitialized()) { throw newUninitializedMessageException(result); } return buildPartial(); } - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest buildParsed() + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol buildParsed() throws com.google.protobuf.InvalidProtocolBufferException { if (!isInitialized()) { throw newUninitializedMessageException( @@ -844,27 +805,27 @@ public final class RemoteProtocol { return buildPartial(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest buildPartial() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol buildPartial() { if (result == null) { throw new IllegalStateException( "build() has already been called on this Builder."); } - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest returnMe = result; + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol returnMe = result; result = null; return returnMe; } public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest) { - return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest)other); + if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest other) { - if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.getDefaultInstance()) return this; + public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol other) { + if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.getDefaultInstance()) return this; if (other.hasId()) { setId(other.getId()); } @@ -901,17 +862,8 @@ public final class RemoteProtocol { if (other.hasIsEscaped()) { setIsEscaped(other.getIsEscaped()); } - if (other.hasSourceHostname()) { - setSourceHostname(other.getSourceHostname()); - } - if (other.hasSourcePort()) { - setSourcePort(other.getSourcePort()); - } - if (other.hasSourceTarget()) { - setSourceTarget(other.getSourceTarget()); - } - if (other.hasSourceUuid()) { - setSourceUuid(other.getSourceUuid()); + if (other.hasSender()) { + mergeSender(other.getSender()); } this.mergeUnknownFields(other.getUnknownFields()); return this; @@ -987,19 +939,12 @@ public final class RemoteProtocol { break; } case 106: { - setSourceHostname(input.readString()); - break; - } - case 112: { - setSourcePort(input.readUInt32()); - break; - } - case 122: { - setSourceTarget(input.readString()); - break; - } - case 130: { - setSourceUuid(input.readString()); + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.newBuilder(); + if (hasSender()) { + subBuilder.mergeFrom(getSender()); + } + input.readMessage(subBuilder, extensionRegistry); + setSender(subBuilder.buildPartial()); break; } } @@ -1241,84 +1186,40 @@ public final class RemoteProtocol { return this; } - // optional string sourceHostname = 13; - public boolean hasSourceHostname() { - return result.hasSourceHostname(); + // optional .se.scalablesolutions.akka.remote.protobuf.ActorRefProtocol sender = 13; + public boolean hasSender() { + return result.hasSender(); } - public java.lang.String getSourceHostname() { - return result.getSourceHostname(); + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol getSender() { + return result.getSender(); } - public Builder setSourceHostname(java.lang.String value) { + public Builder setSender(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol value) { if (value == null) { - throw new NullPointerException(); - } - result.hasSourceHostname = true; - result.sourceHostname_ = value; + throw new NullPointerException(); + } + result.hasSender = true; + result.sender_ = value; return this; } - public Builder clearSourceHostname() { - result.hasSourceHostname = false; - result.sourceHostname_ = getDefaultInstance().getSourceHostname(); + public Builder setSender(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.Builder builderForValue) { + result.hasSender = true; + result.sender_ = builderForValue.build(); return this; } - - // optional uint32 sourcePort = 14; - public boolean hasSourcePort() { - return result.hasSourcePort(); - } - public int getSourcePort() { - return result.getSourcePort(); - } - public Builder setSourcePort(int value) { - result.hasSourcePort = true; - result.sourcePort_ = value; + public Builder mergeSender(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol value) { + if (result.hasSender() && + result.sender_ != se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance()) { + result.sender_ = + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.newBuilder(result.sender_).mergeFrom(value).buildPartial(); + } else { + result.sender_ = value; + } + result.hasSender = true; return this; } - public Builder clearSourcePort() { - result.hasSourcePort = false; - result.sourcePort_ = 0; - return this; - } - - // optional string sourceTarget = 15; - public boolean hasSourceTarget() { - return result.hasSourceTarget(); - } - public java.lang.String getSourceTarget() { - return result.getSourceTarget(); - } - public Builder setSourceTarget(java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - result.hasSourceTarget = true; - result.sourceTarget_ = value; - return this; - } - public Builder clearSourceTarget() { - result.hasSourceTarget = false; - result.sourceTarget_ = getDefaultInstance().getSourceTarget(); - return this; - } - - // optional string sourceUuid = 16; - public boolean hasSourceUuid() { - return result.hasSourceUuid(); - } - public java.lang.String getSourceUuid() { - return result.getSourceUuid(); - } - public Builder setSourceUuid(java.lang.String value) { - if (value == null) { - throw new NullPointerException(); - } - result.hasSourceUuid = true; - result.sourceUuid_ = value; - return this; - } - public Builder clearSourceUuid() { - result.hasSourceUuid = false; - result.sourceUuid_ = getDefaultInstance().getSourceUuid(); + public Builder clearSender() { + result.hasSender = false; + result.sender_ = se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.getDefaultInstance(); return this; } } @@ -1332,28 +1233,28 @@ public final class RemoteProtocol { } } - public static final class RemoteReply extends + public static final class RemoteReplyProtocol extends com.google.protobuf.GeneratedMessage { - // Use RemoteReply.newBuilder() to construct. - private RemoteReply() {} + // Use RemoteReplyProtocol.newBuilder() to construct. + private RemoteReplyProtocol() {} - private static final RemoteReply defaultInstance = new RemoteReply(); - public static RemoteReply getDefaultInstance() { + private static final RemoteReplyProtocol defaultInstance = new RemoteReplyProtocol(); + public static RemoteReplyProtocol getDefaultInstance() { return defaultInstance; } - public RemoteReply getDefaultInstanceForType() { + public RemoteReplyProtocol getDefaultInstanceForType() { return defaultInstance; } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor; } protected com.google.protobuf.GeneratedMessage.FieldAccessorTable internalGetFieldAccessorTable() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_fieldAccessorTable; + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_fieldAccessorTable; } // required uint64 id = 1; @@ -1491,57 +1392,57 @@ public final class RemoteProtocol { return size; } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom(byte[] data) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return newBuilder().mergeFrom(data, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseDelimitedFrom(java.io.InputStream input) + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseDelimitedFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return newBuilder().mergeDelimitedFrom(input, extensionRegistry) .buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return newBuilder().mergeFrom(input).buildParsed(); } - public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply parseFrom( + public static se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { @@ -1551,25 +1452,25 @@ public final class RemoteProtocol { public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply prototype) { + public static Builder newBuilder(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder { - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply result; + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol result; - // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.newBuilder() + // Construct using se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.newBuilder() private Builder() {} private static Builder create() { Builder builder = new Builder(); - builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply(); + builder.result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol(); return builder; } - protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply internalGetResult() { + protected se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol internalGetResult() { return result; } @@ -1578,7 +1479,7 @@ public final class RemoteProtocol { throw new IllegalStateException( "Cannot call clear() after build()."); } - result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply(); + result = new se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol(); return this; } @@ -1588,24 +1489,24 @@ public final class RemoteProtocol { public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.getDescriptor(); + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.getDescriptor(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply getDefaultInstanceForType() { - return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.getDefaultInstance(); + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.getDefaultInstance(); } public boolean isInitialized() { return result.isInitialized(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply build() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol build() { if (result != null && !isInitialized()) { throw newUninitializedMessageException(result); } return buildPartial(); } - private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply buildParsed() + private se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol buildParsed() throws com.google.protobuf.InvalidProtocolBufferException { if (!isInitialized()) { throw newUninitializedMessageException( @@ -1614,27 +1515,27 @@ public final class RemoteProtocol { return buildPartial(); } - public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply buildPartial() { + public se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol buildPartial() { if (result == null) { throw new IllegalStateException( "build() has already been called on this Builder."); } - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply returnMe = result; + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol returnMe = result; result = null; return returnMe; } public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply) { - return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply)other); + if (other instanceof se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol)other); } else { super.mergeFrom(other); return this; } } - public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply other) { - if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.getDefaultInstance()) return this; + public Builder mergeFrom(se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol other) { + if (other == se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.getDefaultInstance()) return this; if (other.hasId()) { setId(other.getId()); } @@ -1888,20 +1789,20 @@ public final class RemoteProtocol { } private static com.google.protobuf.Descriptors.Descriptor - internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor; + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_fieldAccessorTable; + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor; + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_fieldAccessorTable; + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor; + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_fieldAccessorTable; + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -1913,52 +1814,52 @@ public final class RemoteProtocol { java.lang.String[] descriptorData = { "\n>se/scalablesolutions/akka/remote/proto" + "buf/RemoteProtocol.proto\022)se.scalablesol" + - "utions.akka.remote.protobuf\"m\n\010ActorRef\022" + - "\014\n\004uuid\030\001 \002(\t\022\026\n\016actorClassName\030\002 \002(\t\022\026\n" + - "\016sourceHostname\030\003 \002(\t\022\022\n\nsourcePort\030\004 \002(" + - "\r\022\017\n\007timeout\030\005 \002(\004\"\272\002\n\rRemoteRequest\022\n\n\002" + - "id\030\001 \002(\004\022\020\n\010protocol\030\002 \002(\r\022\017\n\007message\030\003 " + - "\002(\014\022\027\n\017messageManifest\030\004 \001(\014\022\016\n\006method\030\005" + - " \001(\t\022\016\n\006target\030\006 \002(\t\022\014\n\004uuid\030\007 \002(\t\022\017\n\007ti" + - "meout\030\010 \002(\004\022\026\n\016supervisorUuid\030\t \001(\t\022\017\n\007i", - "sActor\030\n \002(\010\022\020\n\010isOneWay\030\013 \002(\010\022\021\n\tisEsca" + - "ped\030\014 \002(\010\022\026\n\016sourceHostname\030\r \001(\t\022\022\n\nsou" + - "rcePort\030\016 \001(\r\022\024\n\014sourceTarget\030\017 \001(\t\022\022\n\ns" + - "ourceUuid\030\020 \001(\t\"\247\001\n\013RemoteReply\022\n\n\002id\030\001 " + - "\002(\004\022\020\n\010protocol\030\002 \001(\r\022\017\n\007message\030\003 \001(\014\022\027" + - "\n\017messageManifest\030\004 \001(\014\022\021\n\texception\030\005 \001" + - "(\t\022\026\n\016supervisorUuid\030\006 \001(\t\022\017\n\007isActor\030\007 " + - "\002(\010\022\024\n\014isSuccessful\030\010 \002(\010" + "utions.akka.remote.protobuf\"u\n\020ActorRefP" + + "rotocol\022\014\n\004uuid\030\001 \002(\t\022\026\n\016actorClassName\030" + + "\002 \002(\t\022\026\n\016sourceHostname\030\003 \002(\t\022\022\n\nsourceP" + + "ort\030\004 \002(\r\022\017\n\007timeout\030\005 \002(\004\"\271\002\n\025RemoteReq" + + "uestProtocol\022\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \002" + + "(\r\022\017\n\007message\030\003 \002(\014\022\027\n\017messageManifest\030\004" + + " \001(\014\022\016\n\006method\030\005 \001(\t\022\016\n\006target\030\006 \002(\t\022\014\n\004" + + "uuid\030\007 \002(\t\022\017\n\007timeout\030\010 \002(\004\022\026\n\016superviso", + "rUuid\030\t \001(\t\022\017\n\007isActor\030\n \002(\010\022\020\n\010isOneWay" + + "\030\013 \002(\010\022\021\n\tisEscaped\030\014 \002(\010\022K\n\006sender\030\r \001(" + + "\0132;.se.scalablesolutions.akka.remote.pro" + + "tobuf.ActorRefProtocol\"\257\001\n\023RemoteReplyPr" + + "otocol\022\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \001(\r\022\017\n\007" + + "message\030\003 \001(\014\022\027\n\017messageManifest\030\004 \001(\014\022\021" + + "\n\texception\030\005 \001(\t\022\026\n\016supervisorUuid\030\006 \001(" + + "\t\022\017\n\007isActor\030\007 \002(\010\022\024\n\014isSuccessful\030\010 \002(\010" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { public com.google.protobuf.ExtensionRegistry assignDescriptors( com.google.protobuf.Descriptors.FileDescriptor root) { descriptor = root; - internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor = + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor = getDescriptor().getMessageTypes().get(0); - internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_fieldAccessorTable = new + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRef_descriptor, + internal_static_se_scalablesolutions_akka_remote_protobuf_ActorRefProtocol_descriptor, new java.lang.String[] { "Uuid", "ActorClassName", "SourceHostname", "SourcePort", "Timeout", }, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.class, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRef.Builder.class); - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor = + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.class, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.ActorRefProtocol.Builder.class); + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor = getDescriptor().getMessageTypes().get(1); - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_fieldAccessorTable = new + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequest_descriptor, - new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", "SourceHostname", "SourcePort", "SourceTarget", "SourceUuid", }, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.class, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest.Builder.class); - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor = + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteRequestProtocol_descriptor, + new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", "Sender", }, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.class, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequestProtocol.Builder.class); + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor = getDescriptor().getMessageTypes().get(2); - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_fieldAccessorTable = new + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReply_descriptor, + internal_static_se_scalablesolutions_akka_remote_protobuf_RemoteReplyProtocol_descriptor, new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Exception", "SupervisorUuid", "IsActor", "IsSuccessful", }, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.class, - se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReply.Builder.class); + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.class, + se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteReplyProtocol.Builder.class); return null; } }; diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto index c2ccef040d..372691cd8b 100644 --- a/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto @@ -10,7 +10,7 @@ package se.scalablesolutions.akka.remote.protobuf; protoc se/scalablesolutions/akka/remote/protobuf/RemoteProtocol.proto --java_out . */ -message ActorRef { +message ActorRefProtocol { required string uuid = 1; required string actorClassName = 2; required string sourceHostname = 3; @@ -18,7 +18,7 @@ message ActorRef { required uint64 timeout = 5; } -message RemoteRequest { +message RemoteRequestProtocol { required uint64 id = 1; required uint32 protocol = 2; required bytes message = 3; @@ -31,13 +31,10 @@ message RemoteRequest { required bool isActor = 10; required bool isOneWay = 11; required bool isEscaped = 12; - optional string sourceHostname = 13; - optional uint32 sourcePort = 14; - optional string sourceTarget = 15; - optional string sourceUuid = 16; + optional ActorRefProtocol sender = 13; } -message RemoteReply { +message RemoteReplyProtocol { required uint64 id = 1; optional uint32 protocol = 2; optional bytes message = 3;