From ea8963ef6ebbb2b15e63cb19f91754b20eddd6f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikael=20H=C3=B6gqvist?= Date: Mon, 14 Dec 2009 19:22:37 +0100 Subject: [PATCH] - Support for implicit sender with remote actors (fixes Issue #71) - The RemoteServer and RemoteClient was modified to support a clean shutdown when testing using multiple remote servers --- akka-actors/src/main/scala/actor/Actor.scala | 40 +++- .../src/main/scala/dispatch/Reactor.scala | 2 +- .../src/main/scala/nio/RemoteClient.scala | 29 ++- .../src/main/scala/nio/RemoteServer.scala | 46 ++++- .../src/test/scala/RemoteActorTest.scala | 98 ++++++++-- .../akka/nio/protobuf/RemoteProtocol.java | 181 +++++++++++++++++- .../akka/nio/protobuf/RemoteProtocol.proto | 4 + 7 files changed, 361 insertions(+), 39 deletions(-) diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index 17d158a467..6585bfaf41 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -71,6 +71,8 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { object Actor extends Logging { val TIMEOUT = config.getInt("akka.actor.timeout", 5000) val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) + val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") + val PORT = config.getInt("akka.remote.server.port", 9999) object Sender extends Actor { implicit val Self: AnyRef = this @@ -235,7 +237,7 @@ trait Actor extends TransactionManagement { private[akka] var _remoteAddress: Option[InetSocketAddress] = None private[akka] var _linkedActors: Option[HashSet[Actor]] = None private[akka] var _supervisor: Option[Actor] = None - + private[akka] var _contactAddress: Option[InetSocketAddress] = None private[akka] val _mailbox: Queue[MessageInvocation] = new LinkedList[MessageInvocation] // ==================================== @@ -563,11 +565,11 @@ trait Actor extends TransactionManagement { throw new IllegalStateException( "\n\tNo sender in scope, can't reply. " + "\n\tYou have probably used the '!' method to either; " + - "\n\t\t1. Send a message to a remote actor" + + "\n\t\t1. Send a message to a remote actor which does not have a contact address." + "\n\t\t2. Send a message from an instance that is *not* an actor" + "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + - "\n\tthat will be bound by the argument passed to 'reply'." ) + "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setContactAddress to make sure the actor can be contacted over the network." ) case Some(future) => future.completeWithResult(message) } @@ -609,6 +611,17 @@ trait Actor extends TransactionManagement { _remoteAddress = Some(address) } + /** + * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists. + */ + def setContactAddress(hostname:String, port:Int): Unit = { + setContactAddress(new InetSocketAddress(hostname, port)) + } + + def setContactAddress(address: InetSocketAddress): Unit = { + _contactAddress = Some(address) + } + /** * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists. * However, it will always participate in an existing transaction. @@ -770,6 +783,27 @@ trait Actor extends TransactionManagement { .setIsEscaped(false) val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) + + // set the source fields used to reply back to the original sender + // (i.e. not the remote proxy actor) + if(sender.isDefined) { + requestBuilder.setSourceTarget(sender.get.getClass.getName) + requestBuilder.setSourceUuid(sender.get.uuid) + log.debug("Setting sending actor as " + sender.get.getClass.getName + ", " + _contactAddress) + + if (sender.get._contactAddress.isDefined) { + val addr = sender.get._contactAddress.get + requestBuilder.setSourceHostname(addr.getHostName()) + requestBuilder.setSourcePort(addr.getPort()) + } else { + // set the contact address to the default values from the + // configuration file + requestBuilder.setSourceHostname(Actor.HOSTNAME) + requestBuilder.setSourcePort(Actor.PORT) + } + + } + RemoteProtocolBuilder.setMessage(message, requestBuilder) RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build) } else { diff --git a/akka-actors/src/main/scala/dispatch/Reactor.scala b/akka-actors/src/main/scala/dispatch/Reactor.scala index 339bed0fca..8b12b0b5bc 100644 --- a/akka-actors/src/main/scala/dispatch/Reactor.scala +++ b/akka-actors/src/main/scala/dispatch/Reactor.scala @@ -27,7 +27,7 @@ final class MessageInvocation(val receiver: Actor, override def hashCode(): Int = synchronized { var result = HashCode.SEED result = HashCode.hash(result, receiver) - result = HashCode.hash(result, message) + result = HashCode.hash(result, message.asInstanceOf[AnyRef]) result } diff --git a/akka-actors/src/main/scala/nio/RemoteClient.scala b/akka-actors/src/main/scala/nio/RemoteClient.scala index 3743699b3f..1daca82a6b 100644 --- a/akka-actors/src/main/scala/nio/RemoteClient.scala +++ b/akka-actors/src/main/scala/nio/RemoteClient.scala @@ -32,7 +32,7 @@ object RemoteClient extends Logging { val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000) // TODO: add configuration optons: 'HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel)' - private[akka] val TIMER = new HashedWheelTimer +// private[akka] val TIMER = new HashedWheelTimer private val clients = new HashMap[String, RemoteClient] def clientFor(address: InetSocketAddress): RemoteClient = synchronized { @@ -47,6 +47,15 @@ object RemoteClient extends Logging { client } } + + /* + * Clean-up all open connections + */ + def shutdownAll() = synchronized { + clients.foreach({case (addr, client) => client.shutdown}) + clients.clear +// TIMER.stop + } } /** @@ -66,7 +75,9 @@ class RemoteClient(hostname: String, port: Int) extends Logging { private val bootstrap = new ClientBootstrap(channelFactory) - bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap)) + private val timer = new HashedWheelTimer + + bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, timer)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -91,6 +102,8 @@ class RemoteClient(hostname: String, port: Int) extends Logging { connection.getChannel.getCloseFuture.awaitUninterruptibly channelFactory.releaseExternalResources } + + timer.stop } def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) { @@ -124,10 +137,11 @@ class RemoteClient(hostname: String, port: Int) extends Logging { class RemoteClientPipelineFactory(name: String, futures: ConcurrentMap[Long, CompletableFutureResult], supervisors: ConcurrentMap[String, Actor], - bootstrap: ClientBootstrap) extends ChannelPipelineFactory { + bootstrap: ClientBootstrap, + timer: HashedWheelTimer) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { val pipeline = Channels.pipeline() - pipeline.addLast("timeout", new ReadTimeoutHandler(RemoteClient.TIMER, RemoteClient.READ_TIMEOUT)) + pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)) RemoteServer.COMPRESSION_SCHEME match { case "zlib" => pipeline.addLast("zlibDecoder", new ZlibDecoder) //case "lzf" => pipeline.addLast("lzfDecoder", new LzfDecoder) @@ -142,7 +156,7 @@ class RemoteClientPipelineFactory(name: String, } pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)) pipeline.addLast("protobufEncoder", new ProtobufEncoder()) - pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap)) + pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, timer)) pipeline } } @@ -154,7 +168,8 @@ class RemoteClientPipelineFactory(name: String, class RemoteClientHandler(val name: String, val futures: ConcurrentMap[Long, CompletableFutureResult], val supervisors: ConcurrentMap[String, Actor], - val bootstrap: ClientBootstrap) + val bootstrap: ClientBootstrap, + val timer: HashedWheelTimer) extends SimpleChannelUpstreamHandler with Logging { import Actor.Sender.Self @@ -196,7 +211,7 @@ class RemoteClientHandler(val name: String, } override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - RemoteClient.TIMER.newTimeout(new TimerTask() { + timer.newTimeout(new TimerTask() { def run(timeout: Timeout) = { log.debug("Remote client reconnecting to [%s]", ctx.getChannel.getRemoteAddress) bootstrap.connect diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala index 5a542268c8..4c5a6dd701 100755 --- a/akka-actors/src/main/scala/nio/RemoteServer.scala +++ b/akka-actors/src/main/scala/nio/RemoteServer.scala @@ -15,6 +15,7 @@ import se.scalablesolutions.akka.Config.config import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel._ +import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup} import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} @@ -79,7 +80,7 @@ class RemoteServer extends Logging { private var hostname = RemoteServer.HOSTNAME private var port = RemoteServer.PORT - + @volatile private var isRunning = false @volatile private var isConfigured = false @@ -89,6 +90,9 @@ class RemoteServer extends Logging { private val bootstrap = new ServerBootstrap(factory) + // group of open channels, used for clean-up + private val openChannels:ChannelGroup = new DefaultChannelGroup("akka-server") + def start: Unit = start(None) def start(loader: Option[ClassLoader]): Unit = start(hostname, port, loader) @@ -100,19 +104,20 @@ class RemoteServer extends Logging { hostname = _hostname port = _port log.info("Starting remote server at [%s:%s]", hostname, port) - bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, loader)) + bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, openChannels, loader)) // FIXME make these RemoteServer options configurable bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS) - bootstrap.bind(new InetSocketAddress(hostname, port)) + openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) isRunning = true } } def shutdown = { + openChannels.close.awaitUninterruptibly() bootstrap.releaseExternalResources } } @@ -120,7 +125,7 @@ class RemoteServer extends Logging { /** * @author Jonas Bonér */ -class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) +class RemoteServerPipelineFactory(name: String, openChannels: ChannelGroup, loader: Option[ClassLoader]) extends ChannelPipelineFactory { import RemoteServer._ @@ -140,7 +145,7 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) } pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)) pipeline.addLast("protobufEncoder", new ProtobufEncoder) - pipeline.addLast("handler", new RemoteServerHandler(name, loader)) + pipeline.addLast("handler", new RemoteServerHandler(name, openChannels, loader)) pipeline } } @@ -149,13 +154,22 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) * @author Jonas Bonér */ @ChannelPipelineCoverage { val value = "all" } -class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader]) +class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging { val AW_PROXY_PREFIX = "$$ProxiedByAW".intern private val activeObjects = new ConcurrentHashMap[String, AnyRef] private val actors = new ConcurrentHashMap[String, Actor] + /* + * channelOpen overriden to store open channels for a clean shutdown + * of a RemoteServer. If a channel is closed before, it is + * automatically removed from the open channels group. + */ + override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) { + openChannels.add(ctx.getChannel) + } + override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { @@ -190,7 +204,25 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL actor.start val message = RemoteProtocolBuilder.getMessage(request) if (request.getIsOneWay) { - actor.send(message) + if(request.hasSourceHostname && request.hasSourcePort) { + // re-create the sending actor + val targetClass = if(request.hasSourceTarget) request.getSourceTarget + else request.getTarget +/* val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(targetClass) + else Class.forName(targetClass) + val remoteActor = clazz.newInstance.asInstanceOf[Actor] + log.debug("Re-creating sending actor [%s]", targetClass) + remoteActor._uuid = request.getSourceUuid + remoteActor.timeout = request.getTimeout +*/ + val remoteActor = createActor(targetClass, request.getSourceUuid, request.getTimeout) + remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort) + remoteActor.start + actor.!(message)(remoteActor) + } else { + // couldnt find a way to reply, send the message without a source/sender + actor.send(message) + } } else { try { diff --git a/akka-actors/src/test/scala/RemoteActorTest.scala b/akka-actors/src/test/scala/RemoteActorTest.scala index e2537ce9fd..51b1882342 100644 --- a/akka-actors/src/test/scala/RemoteActorTest.scala +++ b/akka-actors/src/test/scala/RemoteActorTest.scala @@ -4,13 +4,14 @@ import java.util.concurrent.TimeUnit import junit.framework.TestCase import org.scalatest.junit.JUnitSuite -import org.junit.Test +import org.junit.{Test, Before, After} -import se.scalablesolutions.akka.nio.{RemoteNode, RemoteServer} +import se.scalablesolutions.akka.nio.{RemoteServer, RemoteClient} import se.scalablesolutions.akka.dispatch.Dispatchers object Global { var oneWay = "nada" + var remoteReply = "nada" } class RemoteActorSpecActorUnidirectional extends Actor { dispatcher = Dispatchers.newThreadBasedDispatcher(this) @@ -22,8 +23,6 @@ class RemoteActorSpecActorUnidirectional extends Actor { } class RemoteActorSpecActorBidirectional extends Actor { - dispatcher = Dispatchers.newThreadBasedDispatcher(this) - def receive = { case "Hello" => reply("World") @@ -32,23 +31,58 @@ class RemoteActorSpecActorBidirectional extends Actor { } } +case class Send(actor:Actor) + +class RemoteActorSpecActorAsyncSender extends Actor { + def receive = { + case Send(actor:Actor) => + actor ! "Hello" + case "World" => + Global.remoteReply = "replied" + } + + def send(actor:Actor) { + this ! Send(actor) + } +} + class RemoteActorTest extends JUnitSuite { import Actor.Sender.Self akka.Config.config - new Thread(new Runnable() { - def run = { - RemoteNode.start - } - }).start - Thread.sleep(1000) - + + val HOSTNAME = "localhost" + val PORT1 = 9990 + val PORT2 = 9991 + var s1:RemoteServer = null + var s2:RemoteServer = null + + @Before + def init() { + s1 = new RemoteServer() + s2 = new RemoteServer() + + s1.start(HOSTNAME, PORT1) + s2.start(HOSTNAME, PORT2) + Thread.sleep(1000) + } + private val unit = TimeUnit.MILLISECONDS + // make sure the servers shutdown cleanly after the test has + // finished + @After + def finished() { + s1.shutdown + s2.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } + @Test def shouldSendOneWay = { val actor = new RemoteActorSpecActorUnidirectional - actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) + actor.makeRemote(HOSTNAME, PORT1) actor.start val result = actor ! "OneWay" Thread.sleep(100) @@ -59,18 +93,54 @@ class RemoteActorTest extends JUnitSuite { @Test def shouldSendReplyAsync = { val actor = new RemoteActorSpecActorBidirectional - actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) + actor.makeRemote(HOSTNAME, PORT1) actor.start val result = actor !! "Hello" assert("World" === result.get.asInstanceOf[String]) actor.stop } + @Test + def shouldSendRemoteReply = { + implicit val timeout = 500000000L + val actor = new RemoteActorSpecActorBidirectional + actor.makeRemote(HOSTNAME, PORT2) + actor.start + + val sender = new RemoteActorSpecActorAsyncSender + sender.setContactAddress(HOSTNAME, PORT1) + sender.start + sender.send(actor) + Thread.sleep(500) + assert("replied" === Global.remoteReply) + actor.stop + } + +/* + This test does not throw an exception since the + _contactAddress is always defined via the + global configuration if not set explicitly. + + @Test + def shouldSendRemoteReplyException = { + implicit val timeout = 500000000L + val actor = new RemoteActorSpecActorBidirectional + actor.makeRemote(HOSTNAME, PORT1) + actor.start + + val sender = new RemoteActorSpecActorAsyncSender + sender.start + sender.send(actor) + Thread.sleep(500) + assert("exception" === Global.remoteReply) + actor.stop + } +*/ @Test def shouldSendReceiveException = { implicit val timeout = 500000000L val actor = new RemoteActorSpecActorBidirectional - actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT) + actor.makeRemote(HOSTNAME, PORT1) actor.start try { actor !! "Failure" diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.java index 950cfeb918..0386755ba5 100644 --- a/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.java +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.java @@ -115,6 +115,34 @@ public final class RemoteProtocol { public boolean hasIsEscaped() { return hasIsEscaped; } public boolean getIsEscaped() { return isEscaped_; } + // optional string sourceHostname = 13; + public static final int SOURCEHOSTNAME_FIELD_NUMBER = 13; + private boolean hasSourceHostname; + private java.lang.String sourceHostname_ = ""; + public boolean hasSourceHostname() { return hasSourceHostname; } + public java.lang.String getSourceHostname() { return sourceHostname_; } + + // optional uint32 sourcePort = 14; + public static final int SOURCEPORT_FIELD_NUMBER = 14; + private boolean hasSourcePort; + private int sourcePort_ = 0; + public boolean hasSourcePort() { return hasSourcePort; } + public int getSourcePort() { return sourcePort_; } + + // optional string sourceTarget = 15; + public static final int SOURCETARGET_FIELD_NUMBER = 15; + private boolean hasSourceTarget; + private java.lang.String sourceTarget_ = ""; + public boolean hasSourceTarget() { return hasSourceTarget; } + public java.lang.String getSourceTarget() { return sourceTarget_; } + + // optional string sourceUuid = 16; + public static final int SOURCEUUID_FIELD_NUMBER = 16; + private boolean hasSourceUuid; + private java.lang.String sourceUuid_ = ""; + public boolean hasSourceUuid() { return hasSourceUuid; } + public java.lang.String getSourceUuid() { return sourceUuid_; } + public final boolean isInitialized() { if (!hasId) return false; if (!hasProtocol) return false; @@ -166,6 +194,18 @@ public final class RemoteProtocol { if (hasIsEscaped()) { output.writeBool(12, getIsEscaped()); } + if (hasSourceHostname()) { + output.writeString(13, getSourceHostname()); + } + if (hasSourcePort()) { + output.writeUInt32(14, getSourcePort()); + } + if (hasSourceTarget()) { + output.writeString(15, getSourceTarget()); + } + if (hasSourceUuid()) { + output.writeString(16, getSourceUuid()); + } getUnknownFields().writeTo(output); } @@ -223,6 +263,22 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeBoolSize(12, getIsEscaped()); } + if (hasSourceHostname()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(13, getSourceHostname()); + } + if (hasSourcePort()) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(14, getSourcePort()); + } + if (hasSourceTarget()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(15, getSourceTarget()); + } + if (hasSourceUuid()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(16, getSourceUuid()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -408,6 +464,18 @@ public final class RemoteProtocol { if (other.hasIsEscaped()) { setIsEscaped(other.getIsEscaped()); } + if (other.hasSourceHostname()) { + setSourceHostname(other.getSourceHostname()); + } + if (other.hasSourcePort()) { + setSourcePort(other.getSourcePort()); + } + if (other.hasSourceTarget()) { + setSourceTarget(other.getSourceTarget()); + } + if (other.hasSourceUuid()) { + setSourceUuid(other.getSourceUuid()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -481,6 +549,22 @@ public final class RemoteProtocol { setIsEscaped(input.readBool()); break; } + case 106: { + setSourceHostname(input.readString()); + break; + } + case 112: { + setSourcePort(input.readUInt32()); + break; + } + case 122: { + setSourceTarget(input.readString()); + break; + } + case 130: { + setSourceUuid(input.readString()); + break; + } } } } @@ -719,6 +803,87 @@ public final class RemoteProtocol { result.isEscaped_ = false; return this; } + + // optional string sourceHostname = 13; + public boolean hasSourceHostname() { + return result.hasSourceHostname(); + } + public java.lang.String getSourceHostname() { + return result.getSourceHostname(); + } + public Builder setSourceHostname(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasSourceHostname = true; + result.sourceHostname_ = value; + return this; + } + public Builder clearSourceHostname() { + result.hasSourceHostname = false; + result.sourceHostname_ = getDefaultInstance().getSourceHostname(); + return this; + } + + // optional uint32 sourcePort = 14; + public boolean hasSourcePort() { + return result.hasSourcePort(); + } + public int getSourcePort() { + return result.getSourcePort(); + } + public Builder setSourcePort(int value) { + result.hasSourcePort = true; + result.sourcePort_ = value; + return this; + } + public Builder clearSourcePort() { + result.hasSourcePort = false; + result.sourcePort_ = 0; + return this; + } + + // optional string sourceTarget = 15; + public boolean hasSourceTarget() { + return result.hasSourceTarget(); + } + public java.lang.String getSourceTarget() { + return result.getSourceTarget(); + } + public Builder setSourceTarget(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasSourceTarget = true; + result.sourceTarget_ = value; + return this; + } + public Builder clearSourceTarget() { + result.hasSourceTarget = false; + result.sourceTarget_ = getDefaultInstance().getSourceTarget(); + return this; + } + + // optional string sourceUuid = 16; + public boolean hasSourceUuid() { + return result.hasSourceUuid(); + } + public java.lang.String getSourceUuid() { + return result.getSourceUuid(); + } + public Builder setSourceUuid(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasSourceUuid = true; + result.sourceUuid_ = value; + return this; + } + public Builder clearSourceUuid() { + result.hasSourceUuid = false; + result.sourceUuid_ = getDefaultInstance().getSourceUuid(); + return this; + } } static { @@ -1306,17 +1471,19 @@ public final class RemoteProtocol { java.lang.String[] descriptorData = { "\n;se/scalablesolutions/akka/nio/protobuf" + "/RemoteProtocol.proto\022&se.scalablesoluti" + - "ons.akka.nio.protobuf\"\344\001\n\rRemoteRequest\022" + + "ons.akka.nio.protobuf\"\272\002\n\rRemoteRequest\022" + "\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \002(\r\022\017\n\007message" + "\030\003 \002(\014\022\027\n\017messageManifest\030\004 \001(\014\022\016\n\006metho" + "d\030\005 \001(\t\022\016\n\006target\030\006 \002(\t\022\014\n\004uuid\030\007 \002(\t\022\017\n" + "\007timeout\030\010 \002(\004\022\026\n\016supervisorUuid\030\t \001(\t\022\017" + "\n\007isActor\030\n \002(\010\022\020\n\010isOneWay\030\013 \002(\010\022\021\n\tisE" + - "scaped\030\014 \002(\010\"\247\001\n\013RemoteReply\022\n\n\002id\030\001 \002(\004" + - "\022\020\n\010protocol\030\002 \001(\r\022\017\n\007message\030\003 \001(\014\022\027\n\017m", - "essageManifest\030\004 \001(\014\022\021\n\texception\030\005 \001(\t\022" + - "\026\n\016supervisorUuid\030\006 \001(\t\022\017\n\007isActor\030\007 \002(\010" + - "\022\024\n\014isSuccessful\030\010 \002(\010" + "scaped\030\014 \002(\010\022\026\n\016sourceHostname\030\r \001(\t\022\022\n\n" + + "sourcePort\030\016 \001(\r\022\024\n\014sourceTarget\030\017 \001(\t\022\022", + "\n\nsourceUuid\030\020 \001(\t\"\247\001\n\013RemoteReply\022\n\n\002id" + + "\030\001 \002(\004\022\020\n\010protocol\030\002 \001(\r\022\017\n\007message\030\003 \001(" + + "\014\022\027\n\017messageManifest\030\004 \001(\014\022\021\n\texception\030" + + "\005 \001(\t\022\026\n\016supervisorUuid\030\006 \001(\t\022\017\n\007isActor" + + "\030\007 \002(\010\022\024\n\014isSuccessful\030\010 \002(\010" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -1328,7 +1495,7 @@ public final class RemoteProtocol { internal_static_se_scalablesolutions_akka_nio_protobuf_RemoteRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_se_scalablesolutions_akka_nio_protobuf_RemoteRequest_descriptor, - new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", }, + new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", "SourceHostname", "SourcePort", "SourceTarget", "SourceUuid", }, se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest.class, se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest.Builder.class); internal_static_se_scalablesolutions_akka_nio_protobuf_RemoteReply_descriptor = diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.proto b/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.proto index 1248339b3f..b3d45beb6f 100644 --- a/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.proto +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.proto @@ -23,6 +23,10 @@ message RemoteRequest { required bool isActor = 10; required bool isOneWay = 11; required bool isEscaped = 12; + optional string sourceHostname = 13; + optional uint32 sourcePort = 14; + optional string sourceTarget = 15; + optional string sourceUuid = 16; } message RemoteReply {