diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala
index 17d158a467..6585bfaf41 100644
--- a/akka-actors/src/main/scala/actor/Actor.scala
+++ b/akka-actors/src/main/scala/actor/Actor.scala
@@ -71,6 +71,8 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
+ val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
+ val PORT = config.getInt("akka.remote.server.port", 9999)
object Sender extends Actor {
implicit val Self: AnyRef = this
@@ -235,7 +237,7 @@ trait Actor extends TransactionManagement {
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
private[akka] var _linkedActors: Option[HashSet[Actor]] = None
private[akka] var _supervisor: Option[Actor] = None
-
+ private[akka] var _contactAddress: Option[InetSocketAddress] = None
private[akka] val _mailbox: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
// ====================================
@@ -563,11 +565,11 @@ trait Actor extends TransactionManagement {
throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably used the '!' method to either; " +
- "\n\t\t1. Send a message to a remote actor" +
+ "\n\t\t1. Send a message to a remote actor which does not have a contact address." +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
- "\n\tthat will be bound by the argument passed to 'reply'." )
+ "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setContactAddress to make sure the actor can be contacted over the network." )
case Some(future) =>
future.completeWithResult(message)
}
@@ -609,6 +611,17 @@ trait Actor extends TransactionManagement {
_remoteAddress = Some(address)
}
+ /**
+ * Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
+ */
+ def setContactAddress(hostname:String, port:Int): Unit = {
+ setContactAddress(new InetSocketAddress(hostname, port))
+ }
+
+ def setContactAddress(address: InetSocketAddress): Unit = {
+ _contactAddress = Some(address)
+ }
+
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
@@ -770,6 +783,27 @@ trait Actor extends TransactionManagement {
.setIsEscaped(false)
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
+
+ // set the source fields used to reply back to the original sender
+ // (i.e. not the remote proxy actor)
+ if(sender.isDefined) {
+ requestBuilder.setSourceTarget(sender.get.getClass.getName)
+ requestBuilder.setSourceUuid(sender.get.uuid)
+ log.debug("Setting sending actor as " + sender.get.getClass.getName + ", " + _contactAddress)
+
+ if (sender.get._contactAddress.isDefined) {
+ val addr = sender.get._contactAddress.get
+ requestBuilder.setSourceHostname(addr.getHostName())
+ requestBuilder.setSourcePort(addr.getPort())
+ } else {
+ // set the contact address to the default values from the
+ // configuration file
+ requestBuilder.setSourceHostname(Actor.HOSTNAME)
+ requestBuilder.setSourcePort(Actor.PORT)
+ }
+
+ }
+
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
} else {
diff --git a/akka-actors/src/main/scala/dispatch/Reactor.scala b/akka-actors/src/main/scala/dispatch/Reactor.scala
index 339bed0fca..8b12b0b5bc 100644
--- a/akka-actors/src/main/scala/dispatch/Reactor.scala
+++ b/akka-actors/src/main/scala/dispatch/Reactor.scala
@@ -27,7 +27,7 @@ final class MessageInvocation(val receiver: Actor,
override def hashCode(): Int = synchronized {
var result = HashCode.SEED
result = HashCode.hash(result, receiver)
- result = HashCode.hash(result, message)
+ result = HashCode.hash(result, message.asInstanceOf[AnyRef])
result
}
diff --git a/akka-actors/src/main/scala/nio/RemoteClient.scala b/akka-actors/src/main/scala/nio/RemoteClient.scala
index 3743699b3f..1daca82a6b 100644
--- a/akka-actors/src/main/scala/nio/RemoteClient.scala
+++ b/akka-actors/src/main/scala/nio/RemoteClient.scala
@@ -32,7 +32,7 @@ object RemoteClient extends Logging {
val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000)
// TODO: add configuration optons: 'HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel)'
- private[akka] val TIMER = new HashedWheelTimer
+// private[akka] val TIMER = new HashedWheelTimer
private val clients = new HashMap[String, RemoteClient]
def clientFor(address: InetSocketAddress): RemoteClient = synchronized {
@@ -47,6 +47,15 @@ object RemoteClient extends Logging {
client
}
}
+
+ /*
+ * Clean-up all open connections
+ */
+ def shutdownAll() = synchronized {
+ clients.foreach({case (addr, client) => client.shutdown})
+ clients.clear
+// TIMER.stop
+ }
}
/**
@@ -66,7 +75,9 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
private val bootstrap = new ClientBootstrap(channelFactory)
- bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap))
+ private val timer = new HashedWheelTimer
+
+ bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, timer))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@@ -91,6 +102,8 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
connection.getChannel.getCloseFuture.awaitUninterruptibly
channelFactory.releaseExternalResources
}
+
+ timer.stop
}
def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) {
@@ -124,10 +137,11 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFutureResult],
supervisors: ConcurrentMap[String, Actor],
- bootstrap: ClientBootstrap) extends ChannelPipelineFactory {
+ bootstrap: ClientBootstrap,
+ timer: HashedWheelTimer) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline()
- pipeline.addLast("timeout", new ReadTimeoutHandler(RemoteClient.TIMER, RemoteClient.READ_TIMEOUT))
+ pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT))
RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => pipeline.addLast("zlibDecoder", new ZlibDecoder)
//case "lzf" => pipeline.addLast("lzfDecoder", new LzfDecoder)
@@ -142,7 +156,7 @@ class RemoteClientPipelineFactory(name: String,
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
pipeline.addLast("protobufEncoder", new ProtobufEncoder())
- pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap))
+ pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, timer))
pipeline
}
}
@@ -154,7 +168,8 @@ class RemoteClientPipelineFactory(name: String,
class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFutureResult],
val supervisors: ConcurrentMap[String, Actor],
- val bootstrap: ClientBootstrap)
+ val bootstrap: ClientBootstrap,
+ val timer: HashedWheelTimer)
extends SimpleChannelUpstreamHandler with Logging {
import Actor.Sender.Self
@@ -196,7 +211,7 @@ class RemoteClientHandler(val name: String,
}
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
- RemoteClient.TIMER.newTimeout(new TimerTask() {
+ timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
log.debug("Remote client reconnecting to [%s]", ctx.getChannel.getRemoteAddress)
bootstrap.connect
diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala
index 5a542268c8..4c5a6dd701 100755
--- a/akka-actors/src/main/scala/nio/RemoteServer.scala
+++ b/akka-actors/src/main/scala/nio/RemoteServer.scala
@@ -15,6 +15,7 @@ import se.scalablesolutions.akka.Config.config
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
+import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup}
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
@@ -79,7 +80,7 @@ class RemoteServer extends Logging {
private var hostname = RemoteServer.HOSTNAME
private var port = RemoteServer.PORT
-
+
@volatile private var isRunning = false
@volatile private var isConfigured = false
@@ -89,6 +90,9 @@ class RemoteServer extends Logging {
private val bootstrap = new ServerBootstrap(factory)
+ // group of open channels, used for clean-up
+ private val openChannels:ChannelGroup = new DefaultChannelGroup("akka-server")
+
def start: Unit = start(None)
def start(loader: Option[ClassLoader]): Unit = start(hostname, port, loader)
@@ -100,19 +104,20 @@ class RemoteServer extends Logging {
hostname = _hostname
port = _port
log.info("Starting remote server at [%s:%s]", hostname, port)
- bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, loader))
+ bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, openChannels, loader))
// FIXME make these RemoteServer options configurable
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
- bootstrap.bind(new InetSocketAddress(hostname, port))
+ openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
isRunning = true
}
}
def shutdown = {
+ openChannels.close.awaitUninterruptibly()
bootstrap.releaseExternalResources
}
}
@@ -120,7 +125,7 @@ class RemoteServer extends Logging {
/**
* @author Jonas Bonér
*/
-class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader])
+class RemoteServerPipelineFactory(name: String, openChannels: ChannelGroup, loader: Option[ClassLoader])
extends ChannelPipelineFactory {
import RemoteServer._
@@ -140,7 +145,7 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader])
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
pipeline.addLast("protobufEncoder", new ProtobufEncoder)
- pipeline.addLast("handler", new RemoteServerHandler(name, loader))
+ pipeline.addLast("handler", new RemoteServerHandler(name, openChannels, loader))
pipeline
}
}
@@ -149,13 +154,22 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader])
* @author Jonas Bonér
*/
@ChannelPipelineCoverage { val value = "all" }
-class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader])
+class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader])
extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
+ /*
+ * channelOpen overriden to store open channels for a clean shutdown
+ * of a RemoteServer. If a channel is closed before, it is
+ * automatically removed from the open channels group.
+ */
+ override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) {
+ openChannels.add(ctx.getChannel)
+ }
+
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] &&
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
@@ -190,7 +204,25 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
actor.start
val message = RemoteProtocolBuilder.getMessage(request)
if (request.getIsOneWay) {
- actor.send(message)
+ if(request.hasSourceHostname && request.hasSourcePort) {
+ // re-create the sending actor
+ val targetClass = if(request.hasSourceTarget) request.getSourceTarget
+ else request.getTarget
+/* val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(targetClass)
+ else Class.forName(targetClass)
+ val remoteActor = clazz.newInstance.asInstanceOf[Actor]
+ log.debug("Re-creating sending actor [%s]", targetClass)
+ remoteActor._uuid = request.getSourceUuid
+ remoteActor.timeout = request.getTimeout
+*/
+ val remoteActor = createActor(targetClass, request.getSourceUuid, request.getTimeout)
+ remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort)
+ remoteActor.start
+ actor.!(message)(remoteActor)
+ } else {
+ // couldnt find a way to reply, send the message without a source/sender
+ actor.send(message)
+ }
}
else {
try {
diff --git a/akka-actors/src/test/scala/RemoteActorTest.scala b/akka-actors/src/test/scala/RemoteActorTest.scala
index e2537ce9fd..51b1882342 100644
--- a/akka-actors/src/test/scala/RemoteActorTest.scala
+++ b/akka-actors/src/test/scala/RemoteActorTest.scala
@@ -4,13 +4,14 @@ import java.util.concurrent.TimeUnit
import junit.framework.TestCase
import org.scalatest.junit.JUnitSuite
-import org.junit.Test
+import org.junit.{Test, Before, After}
-import se.scalablesolutions.akka.nio.{RemoteNode, RemoteServer}
+import se.scalablesolutions.akka.nio.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.dispatch.Dispatchers
object Global {
var oneWay = "nada"
+ var remoteReply = "nada"
}
class RemoteActorSpecActorUnidirectional extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
@@ -22,8 +23,6 @@ class RemoteActorSpecActorUnidirectional extends Actor {
}
class RemoteActorSpecActorBidirectional extends Actor {
- dispatcher = Dispatchers.newThreadBasedDispatcher(this)
-
def receive = {
case "Hello" =>
reply("World")
@@ -32,23 +31,58 @@ class RemoteActorSpecActorBidirectional extends Actor {
}
}
+case class Send(actor:Actor)
+
+class RemoteActorSpecActorAsyncSender extends Actor {
+ def receive = {
+ case Send(actor:Actor) =>
+ actor ! "Hello"
+ case "World" =>
+ Global.remoteReply = "replied"
+ }
+
+ def send(actor:Actor) {
+ this ! Send(actor)
+ }
+}
+
class RemoteActorTest extends JUnitSuite {
import Actor.Sender.Self
akka.Config.config
- new Thread(new Runnable() {
- def run = {
- RemoteNode.start
- }
- }).start
- Thread.sleep(1000)
-
+
+ val HOSTNAME = "localhost"
+ val PORT1 = 9990
+ val PORT2 = 9991
+ var s1:RemoteServer = null
+ var s2:RemoteServer = null
+
+ @Before
+ def init() {
+ s1 = new RemoteServer()
+ s2 = new RemoteServer()
+
+ s1.start(HOSTNAME, PORT1)
+ s2.start(HOSTNAME, PORT2)
+ Thread.sleep(1000)
+ }
+
private val unit = TimeUnit.MILLISECONDS
+ // make sure the servers shutdown cleanly after the test has
+ // finished
+ @After
+ def finished() {
+ s1.shutdown
+ s2.shutdown
+ RemoteClient.shutdownAll
+ Thread.sleep(1000)
+ }
+
@Test
def shouldSendOneWay = {
val actor = new RemoteActorSpecActorUnidirectional
- actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ actor.makeRemote(HOSTNAME, PORT1)
actor.start
val result = actor ! "OneWay"
Thread.sleep(100)
@@ -59,18 +93,54 @@ class RemoteActorTest extends JUnitSuite {
@Test
def shouldSendReplyAsync = {
val actor = new RemoteActorSpecActorBidirectional
- actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ actor.makeRemote(HOSTNAME, PORT1)
actor.start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
actor.stop
}
+ @Test
+ def shouldSendRemoteReply = {
+ implicit val timeout = 500000000L
+ val actor = new RemoteActorSpecActorBidirectional
+ actor.makeRemote(HOSTNAME, PORT2)
+ actor.start
+
+ val sender = new RemoteActorSpecActorAsyncSender
+ sender.setContactAddress(HOSTNAME, PORT1)
+ sender.start
+ sender.send(actor)
+ Thread.sleep(500)
+ assert("replied" === Global.remoteReply)
+ actor.stop
+ }
+
+/*
+ This test does not throw an exception since the
+ _contactAddress is always defined via the
+ global configuration if not set explicitly.
+
+ @Test
+ def shouldSendRemoteReplyException = {
+ implicit val timeout = 500000000L
+ val actor = new RemoteActorSpecActorBidirectional
+ actor.makeRemote(HOSTNAME, PORT1)
+ actor.start
+
+ val sender = new RemoteActorSpecActorAsyncSender
+ sender.start
+ sender.send(actor)
+ Thread.sleep(500)
+ assert("exception" === Global.remoteReply)
+ actor.stop
+ }
+*/
@Test
def shouldSendReceiveException = {
implicit val timeout = 500000000L
val actor = new RemoteActorSpecActorBidirectional
- actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
+ actor.makeRemote(HOSTNAME, PORT1)
actor.start
try {
actor !! "Failure"
diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.java
index 950cfeb918..0386755ba5 100644
--- a/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.java
+++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.java
@@ -115,6 +115,34 @@ public final class RemoteProtocol {
public boolean hasIsEscaped() { return hasIsEscaped; }
public boolean getIsEscaped() { return isEscaped_; }
+ // optional string sourceHostname = 13;
+ public static final int SOURCEHOSTNAME_FIELD_NUMBER = 13;
+ private boolean hasSourceHostname;
+ private java.lang.String sourceHostname_ = "";
+ public boolean hasSourceHostname() { return hasSourceHostname; }
+ public java.lang.String getSourceHostname() { return sourceHostname_; }
+
+ // optional uint32 sourcePort = 14;
+ public static final int SOURCEPORT_FIELD_NUMBER = 14;
+ private boolean hasSourcePort;
+ private int sourcePort_ = 0;
+ public boolean hasSourcePort() { return hasSourcePort; }
+ public int getSourcePort() { return sourcePort_; }
+
+ // optional string sourceTarget = 15;
+ public static final int SOURCETARGET_FIELD_NUMBER = 15;
+ private boolean hasSourceTarget;
+ private java.lang.String sourceTarget_ = "";
+ public boolean hasSourceTarget() { return hasSourceTarget; }
+ public java.lang.String getSourceTarget() { return sourceTarget_; }
+
+ // optional string sourceUuid = 16;
+ public static final int SOURCEUUID_FIELD_NUMBER = 16;
+ private boolean hasSourceUuid;
+ private java.lang.String sourceUuid_ = "";
+ public boolean hasSourceUuid() { return hasSourceUuid; }
+ public java.lang.String getSourceUuid() { return sourceUuid_; }
+
public final boolean isInitialized() {
if (!hasId) return false;
if (!hasProtocol) return false;
@@ -166,6 +194,18 @@ public final class RemoteProtocol {
if (hasIsEscaped()) {
output.writeBool(12, getIsEscaped());
}
+ if (hasSourceHostname()) {
+ output.writeString(13, getSourceHostname());
+ }
+ if (hasSourcePort()) {
+ output.writeUInt32(14, getSourcePort());
+ }
+ if (hasSourceTarget()) {
+ output.writeString(15, getSourceTarget());
+ }
+ if (hasSourceUuid()) {
+ output.writeString(16, getSourceUuid());
+ }
getUnknownFields().writeTo(output);
}
@@ -223,6 +263,22 @@ public final class RemoteProtocol {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(12, getIsEscaped());
}
+ if (hasSourceHostname()) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeStringSize(13, getSourceHostname());
+ }
+ if (hasSourcePort()) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(14, getSourcePort());
+ }
+ if (hasSourceTarget()) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeStringSize(15, getSourceTarget());
+ }
+ if (hasSourceUuid()) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeStringSize(16, getSourceUuid());
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -408,6 +464,18 @@ public final class RemoteProtocol {
if (other.hasIsEscaped()) {
setIsEscaped(other.getIsEscaped());
}
+ if (other.hasSourceHostname()) {
+ setSourceHostname(other.getSourceHostname());
+ }
+ if (other.hasSourcePort()) {
+ setSourcePort(other.getSourcePort());
+ }
+ if (other.hasSourceTarget()) {
+ setSourceTarget(other.getSourceTarget());
+ }
+ if (other.hasSourceUuid()) {
+ setSourceUuid(other.getSourceUuid());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -481,6 +549,22 @@ public final class RemoteProtocol {
setIsEscaped(input.readBool());
break;
}
+ case 106: {
+ setSourceHostname(input.readString());
+ break;
+ }
+ case 112: {
+ setSourcePort(input.readUInt32());
+ break;
+ }
+ case 122: {
+ setSourceTarget(input.readString());
+ break;
+ }
+ case 130: {
+ setSourceUuid(input.readString());
+ break;
+ }
}
}
}
@@ -719,6 +803,87 @@ public final class RemoteProtocol {
result.isEscaped_ = false;
return this;
}
+
+ // optional string sourceHostname = 13;
+ public boolean hasSourceHostname() {
+ return result.hasSourceHostname();
+ }
+ public java.lang.String getSourceHostname() {
+ return result.getSourceHostname();
+ }
+ public Builder setSourceHostname(java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ result.hasSourceHostname = true;
+ result.sourceHostname_ = value;
+ return this;
+ }
+ public Builder clearSourceHostname() {
+ result.hasSourceHostname = false;
+ result.sourceHostname_ = getDefaultInstance().getSourceHostname();
+ return this;
+ }
+
+ // optional uint32 sourcePort = 14;
+ public boolean hasSourcePort() {
+ return result.hasSourcePort();
+ }
+ public int getSourcePort() {
+ return result.getSourcePort();
+ }
+ public Builder setSourcePort(int value) {
+ result.hasSourcePort = true;
+ result.sourcePort_ = value;
+ return this;
+ }
+ public Builder clearSourcePort() {
+ result.hasSourcePort = false;
+ result.sourcePort_ = 0;
+ return this;
+ }
+
+ // optional string sourceTarget = 15;
+ public boolean hasSourceTarget() {
+ return result.hasSourceTarget();
+ }
+ public java.lang.String getSourceTarget() {
+ return result.getSourceTarget();
+ }
+ public Builder setSourceTarget(java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ result.hasSourceTarget = true;
+ result.sourceTarget_ = value;
+ return this;
+ }
+ public Builder clearSourceTarget() {
+ result.hasSourceTarget = false;
+ result.sourceTarget_ = getDefaultInstance().getSourceTarget();
+ return this;
+ }
+
+ // optional string sourceUuid = 16;
+ public boolean hasSourceUuid() {
+ return result.hasSourceUuid();
+ }
+ public java.lang.String getSourceUuid() {
+ return result.getSourceUuid();
+ }
+ public Builder setSourceUuid(java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ result.hasSourceUuid = true;
+ result.sourceUuid_ = value;
+ return this;
+ }
+ public Builder clearSourceUuid() {
+ result.hasSourceUuid = false;
+ result.sourceUuid_ = getDefaultInstance().getSourceUuid();
+ return this;
+ }
}
static {
@@ -1306,17 +1471,19 @@ public final class RemoteProtocol {
java.lang.String[] descriptorData = {
"\n;se/scalablesolutions/akka/nio/protobuf" +
"/RemoteProtocol.proto\022&se.scalablesoluti" +
- "ons.akka.nio.protobuf\"\344\001\n\rRemoteRequest\022" +
+ "ons.akka.nio.protobuf\"\272\002\n\rRemoteRequest\022" +
"\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \002(\r\022\017\n\007message" +
"\030\003 \002(\014\022\027\n\017messageManifest\030\004 \001(\014\022\016\n\006metho" +
"d\030\005 \001(\t\022\016\n\006target\030\006 \002(\t\022\014\n\004uuid\030\007 \002(\t\022\017\n" +
"\007timeout\030\010 \002(\004\022\026\n\016supervisorUuid\030\t \001(\t\022\017" +
"\n\007isActor\030\n \002(\010\022\020\n\010isOneWay\030\013 \002(\010\022\021\n\tisE" +
- "scaped\030\014 \002(\010\"\247\001\n\013RemoteReply\022\n\n\002id\030\001 \002(\004" +
- "\022\020\n\010protocol\030\002 \001(\r\022\017\n\007message\030\003 \001(\014\022\027\n\017m",
- "essageManifest\030\004 \001(\014\022\021\n\texception\030\005 \001(\t\022" +
- "\026\n\016supervisorUuid\030\006 \001(\t\022\017\n\007isActor\030\007 \002(\010" +
- "\022\024\n\014isSuccessful\030\010 \002(\010"
+ "scaped\030\014 \002(\010\022\026\n\016sourceHostname\030\r \001(\t\022\022\n\n" +
+ "sourcePort\030\016 \001(\r\022\024\n\014sourceTarget\030\017 \001(\t\022\022",
+ "\n\nsourceUuid\030\020 \001(\t\"\247\001\n\013RemoteReply\022\n\n\002id" +
+ "\030\001 \002(\004\022\020\n\010protocol\030\002 \001(\r\022\017\n\007message\030\003 \001(" +
+ "\014\022\027\n\017messageManifest\030\004 \001(\014\022\021\n\texception\030" +
+ "\005 \001(\t\022\026\n\016supervisorUuid\030\006 \001(\t\022\017\n\007isActor" +
+ "\030\007 \002(\010\022\024\n\014isSuccessful\030\010 \002(\010"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -1328,7 +1495,7 @@ public final class RemoteProtocol {
internal_static_se_scalablesolutions_akka_nio_protobuf_RemoteRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_se_scalablesolutions_akka_nio_protobuf_RemoteRequest_descriptor,
- new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", },
+ new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", "SourceHostname", "SourcePort", "SourceTarget", "SourceUuid", },
se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest.class,
se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest.Builder.class);
internal_static_se_scalablesolutions_akka_nio_protobuf_RemoteReply_descriptor =
diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.proto b/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.proto
index 1248339b3f..b3d45beb6f 100644
--- a/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.proto
+++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/nio/protobuf/RemoteProtocol.proto
@@ -23,6 +23,10 @@ message RemoteRequest {
required bool isActor = 10;
required bool isOneWay = 11;
required bool isEscaped = 12;
+ optional string sourceHostname = 13;
+ optional uint32 sourcePort = 14;
+ optional string sourceTarget = 15;
+ optional string sourceUuid = 16;
}
message RemoteReply {