- Support for implicit sender with remote actors (fixes Issue #71)

- The RemoteServer and RemoteClient was modified to support a clean shutdown when testing using multiple remote servers
This commit is contained in:
Mikael Högqvist 2009-12-14 19:22:37 +01:00
parent 3de15e3590
commit ea8963ef6e
7 changed files with 361 additions and 39 deletions

View file

@ -71,6 +71,8 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
object Sender extends Actor {
implicit val Self: AnyRef = this
@ -235,7 +237,7 @@ trait Actor extends TransactionManagement {
private[akka] var _remoteAddress: Option[InetSocketAddress] = None
private[akka] var _linkedActors: Option[HashSet[Actor]] = None
private[akka] var _supervisor: Option[Actor] = None
private[akka] var _contactAddress: Option[InetSocketAddress] = None
private[akka] val _mailbox: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
// ====================================
@ -563,11 +565,11 @@ trait Actor extends TransactionManagement {
throw new IllegalStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably used the '!' method to either; " +
"\n\t\t1. Send a message to a remote actor" +
"\n\t\t1. Send a message to a remote actor which does not have a contact address." +
"\n\t\t2. Send a message from an instance that is *not* an actor" +
"\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " +
"\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" +
"\n\tthat will be bound by the argument passed to 'reply'." )
"\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setContactAddress to make sure the actor can be contacted over the network." )
case Some(future) =>
future.completeWithResult(message)
}
@ -609,6 +611,17 @@ trait Actor extends TransactionManagement {
_remoteAddress = Some(address)
}
/**
* Set the contact address for this actor. This is used for replying to messages sent asynchronously when no reply channel exists.
*/
def setContactAddress(hostname:String, port:Int): Unit = {
setContactAddress(new InetSocketAddress(hostname, port))
}
def setContactAddress(address: InetSocketAddress): Unit = {
_contactAddress = Some(address)
}
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
@ -770,6 +783,27 @@ trait Actor extends TransactionManagement {
.setIsEscaped(false)
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
// set the source fields used to reply back to the original sender
// (i.e. not the remote proxy actor)
if(sender.isDefined) {
requestBuilder.setSourceTarget(sender.get.getClass.getName)
requestBuilder.setSourceUuid(sender.get.uuid)
log.debug("Setting sending actor as " + sender.get.getClass.getName + ", " + _contactAddress)
if (sender.get._contactAddress.isDefined) {
val addr = sender.get._contactAddress.get
requestBuilder.setSourceHostname(addr.getHostName())
requestBuilder.setSourcePort(addr.getPort())
} else {
// set the contact address to the default values from the
// configuration file
requestBuilder.setSourceHostname(Actor.HOSTNAME)
requestBuilder.setSourcePort(Actor.PORT)
}
}
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
} else {

View file

@ -27,7 +27,7 @@ final class MessageInvocation(val receiver: Actor,
override def hashCode(): Int = synchronized {
var result = HashCode.SEED
result = HashCode.hash(result, receiver)
result = HashCode.hash(result, message)
result = HashCode.hash(result, message.asInstanceOf[AnyRef])
result
}

View file

@ -32,7 +32,7 @@ object RemoteClient extends Logging {
val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000)
// TODO: add configuration optons: 'HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel)'
private[akka] val TIMER = new HashedWheelTimer
// private[akka] val TIMER = new HashedWheelTimer
private val clients = new HashMap[String, RemoteClient]
def clientFor(address: InetSocketAddress): RemoteClient = synchronized {
@ -47,6 +47,15 @@ object RemoteClient extends Logging {
client
}
}
/*
* Clean-up all open connections
*/
def shutdownAll() = synchronized {
clients.foreach({case (addr, client) => client.shutdown})
clients.clear
// TIMER.stop
}
}
/**
@ -66,7 +75,9 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
private val bootstrap = new ClientBootstrap(channelFactory)
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap))
private val timer = new HashedWheelTimer
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, timer))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@ -91,6 +102,8 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
connection.getChannel.getCloseFuture.awaitUninterruptibly
channelFactory.releaseExternalResources
}
timer.stop
}
def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) {
@ -124,10 +137,11 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFutureResult],
supervisors: ConcurrentMap[String, Actor],
bootstrap: ClientBootstrap) extends ChannelPipelineFactory {
bootstrap: ClientBootstrap,
timer: HashedWheelTimer) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline()
pipeline.addLast("timeout", new ReadTimeoutHandler(RemoteClient.TIMER, RemoteClient.READ_TIMEOUT))
pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT))
RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => pipeline.addLast("zlibDecoder", new ZlibDecoder)
//case "lzf" => pipeline.addLast("lzfDecoder", new LzfDecoder)
@ -142,7 +156,7 @@ class RemoteClientPipelineFactory(name: String,
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
pipeline.addLast("protobufEncoder", new ProtobufEncoder())
pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap))
pipeline.addLast("handler", new RemoteClientHandler(name, futures, supervisors, bootstrap, timer))
pipeline
}
}
@ -154,7 +168,8 @@ class RemoteClientPipelineFactory(name: String,
class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFutureResult],
val supervisors: ConcurrentMap[String, Actor],
val bootstrap: ClientBootstrap)
val bootstrap: ClientBootstrap,
val timer: HashedWheelTimer)
extends SimpleChannelUpstreamHandler with Logging {
import Actor.Sender.Self
@ -196,7 +211,7 @@ class RemoteClientHandler(val name: String,
}
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
RemoteClient.TIMER.newTimeout(new TimerTask() {
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
log.debug("Remote client reconnecting to [%s]", ctx.getChannel.getRemoteAddress)
bootstrap.connect

View file

@ -15,6 +15,7 @@ import se.scalablesolutions.akka.Config.config
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup}
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
@ -79,7 +80,7 @@ class RemoteServer extends Logging {
private var hostname = RemoteServer.HOSTNAME
private var port = RemoteServer.PORT
@volatile private var isRunning = false
@volatile private var isConfigured = false
@ -89,6 +90,9 @@ class RemoteServer extends Logging {
private val bootstrap = new ServerBootstrap(factory)
// group of open channels, used for clean-up
private val openChannels:ChannelGroup = new DefaultChannelGroup("akka-server")
def start: Unit = start(None)
def start(loader: Option[ClassLoader]): Unit = start(hostname, port, loader)
@ -100,19 +104,20 @@ class RemoteServer extends Logging {
hostname = _hostname
port = _port
log.info("Starting remote server at [%s:%s]", hostname, port)
bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, loader))
bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, openChannels, loader))
// FIXME make these RemoteServer options configurable
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
bootstrap.bind(new InetSocketAddress(hostname, port))
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
isRunning = true
}
}
def shutdown = {
openChannels.close.awaitUninterruptibly()
bootstrap.releaseExternalResources
}
}
@ -120,7 +125,7 @@ class RemoteServer extends Logging {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader])
class RemoteServerPipelineFactory(name: String, openChannels: ChannelGroup, loader: Option[ClassLoader])
extends ChannelPipelineFactory {
import RemoteServer._
@ -140,7 +145,7 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader])
}
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4))
pipeline.addLast("protobufEncoder", new ProtobufEncoder)
pipeline.addLast("handler", new RemoteServerHandler(name, loader))
pipeline.addLast("handler", new RemoteServerHandler(name, openChannels, loader))
pipeline
}
}
@ -149,13 +154,22 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader])
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ChannelPipelineCoverage { val value = "all" }
class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader])
class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader])
extends SimpleChannelUpstreamHandler with Logging {
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
/*
* channelOpen overriden to store open channels for a clean shutdown
* of a RemoteServer. If a channel is closed before, it is
* automatically removed from the open channels group.
*/
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) {
openChannels.add(ctx.getChannel)
}
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] &&
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
@ -190,7 +204,25 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
actor.start
val message = RemoteProtocolBuilder.getMessage(request)
if (request.getIsOneWay) {
actor.send(message)
if(request.hasSourceHostname && request.hasSourcePort) {
// re-create the sending actor
val targetClass = if(request.hasSourceTarget) request.getSourceTarget
else request.getTarget
/* val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(targetClass)
else Class.forName(targetClass)
val remoteActor = clazz.newInstance.asInstanceOf[Actor]
log.debug("Re-creating sending actor [%s]", targetClass)
remoteActor._uuid = request.getSourceUuid
remoteActor.timeout = request.getTimeout
*/
val remoteActor = createActor(targetClass, request.getSourceUuid, request.getTimeout)
remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort)
remoteActor.start
actor.!(message)(remoteActor)
} else {
// couldnt find a way to reply, send the message without a source/sender
actor.send(message)
}
}
else {
try {

View file

@ -4,13 +4,14 @@ import java.util.concurrent.TimeUnit
import junit.framework.TestCase
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import org.junit.{Test, Before, After}
import se.scalablesolutions.akka.nio.{RemoteNode, RemoteServer}
import se.scalablesolutions.akka.nio.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.dispatch.Dispatchers
object Global {
var oneWay = "nada"
var remoteReply = "nada"
}
class RemoteActorSpecActorUnidirectional extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
@ -22,8 +23,6 @@ class RemoteActorSpecActorUnidirectional extends Actor {
}
class RemoteActorSpecActorBidirectional extends Actor {
dispatcher = Dispatchers.newThreadBasedDispatcher(this)
def receive = {
case "Hello" =>
reply("World")
@ -32,23 +31,58 @@ class RemoteActorSpecActorBidirectional extends Actor {
}
}
case class Send(actor:Actor)
class RemoteActorSpecActorAsyncSender extends Actor {
def receive = {
case Send(actor:Actor) =>
actor ! "Hello"
case "World" =>
Global.remoteReply = "replied"
}
def send(actor:Actor) {
this ! Send(actor)
}
}
class RemoteActorTest extends JUnitSuite {
import Actor.Sender.Self
akka.Config.config
new Thread(new Runnable() {
def run = {
RemoteNode.start
}
}).start
Thread.sleep(1000)
val HOSTNAME = "localhost"
val PORT1 = 9990
val PORT2 = 9991
var s1:RemoteServer = null
var s2:RemoteServer = null
@Before
def init() {
s1 = new RemoteServer()
s2 = new RemoteServer()
s1.start(HOSTNAME, PORT1)
s2.start(HOSTNAME, PORT2)
Thread.sleep(1000)
}
private val unit = TimeUnit.MILLISECONDS
// make sure the servers shutdown cleanly after the test has
// finished
@After
def finished() {
s1.shutdown
s2.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
}
@Test
def shouldSendOneWay = {
val actor = new RemoteActorSpecActorUnidirectional
actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val result = actor ! "OneWay"
Thread.sleep(100)
@ -59,18 +93,54 @@ class RemoteActorTest extends JUnitSuite {
@Test
def shouldSendReplyAsync = {
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
actor.stop
}
@Test
def shouldSendRemoteReply = {
implicit val timeout = 500000000L
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote(HOSTNAME, PORT2)
actor.start
val sender = new RemoteActorSpecActorAsyncSender
sender.setContactAddress(HOSTNAME, PORT1)
sender.start
sender.send(actor)
Thread.sleep(500)
assert("replied" === Global.remoteReply)
actor.stop
}
/*
This test does not throw an exception since the
_contactAddress is always defined via the
global configuration if not set explicitly.
@Test
def shouldSendRemoteReplyException = {
implicit val timeout = 500000000L
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val sender = new RemoteActorSpecActorAsyncSender
sender.start
sender.send(actor)
Thread.sleep(500)
assert("exception" === Global.remoteReply)
actor.stop
}
*/
@Test
def shouldSendReceiveException = {
implicit val timeout = 500000000L
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
actor.makeRemote(HOSTNAME, PORT1)
actor.start
try {
actor !! "Failure"

View file

@ -115,6 +115,34 @@ public final class RemoteProtocol {
public boolean hasIsEscaped() { return hasIsEscaped; }
public boolean getIsEscaped() { return isEscaped_; }
// optional string sourceHostname = 13;
public static final int SOURCEHOSTNAME_FIELD_NUMBER = 13;
private boolean hasSourceHostname;
private java.lang.String sourceHostname_ = "";
public boolean hasSourceHostname() { return hasSourceHostname; }
public java.lang.String getSourceHostname() { return sourceHostname_; }
// optional uint32 sourcePort = 14;
public static final int SOURCEPORT_FIELD_NUMBER = 14;
private boolean hasSourcePort;
private int sourcePort_ = 0;
public boolean hasSourcePort() { return hasSourcePort; }
public int getSourcePort() { return sourcePort_; }
// optional string sourceTarget = 15;
public static final int SOURCETARGET_FIELD_NUMBER = 15;
private boolean hasSourceTarget;
private java.lang.String sourceTarget_ = "";
public boolean hasSourceTarget() { return hasSourceTarget; }
public java.lang.String getSourceTarget() { return sourceTarget_; }
// optional string sourceUuid = 16;
public static final int SOURCEUUID_FIELD_NUMBER = 16;
private boolean hasSourceUuid;
private java.lang.String sourceUuid_ = "";
public boolean hasSourceUuid() { return hasSourceUuid; }
public java.lang.String getSourceUuid() { return sourceUuid_; }
public final boolean isInitialized() {
if (!hasId) return false;
if (!hasProtocol) return false;
@ -166,6 +194,18 @@ public final class RemoteProtocol {
if (hasIsEscaped()) {
output.writeBool(12, getIsEscaped());
}
if (hasSourceHostname()) {
output.writeString(13, getSourceHostname());
}
if (hasSourcePort()) {
output.writeUInt32(14, getSourcePort());
}
if (hasSourceTarget()) {
output.writeString(15, getSourceTarget());
}
if (hasSourceUuid()) {
output.writeString(16, getSourceUuid());
}
getUnknownFields().writeTo(output);
}
@ -223,6 +263,22 @@ public final class RemoteProtocol {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(12, getIsEscaped());
}
if (hasSourceHostname()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(13, getSourceHostname());
}
if (hasSourcePort()) {
size += com.google.protobuf.CodedOutputStream
.computeUInt32Size(14, getSourcePort());
}
if (hasSourceTarget()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(15, getSourceTarget());
}
if (hasSourceUuid()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(16, getSourceUuid());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -408,6 +464,18 @@ public final class RemoteProtocol {
if (other.hasIsEscaped()) {
setIsEscaped(other.getIsEscaped());
}
if (other.hasSourceHostname()) {
setSourceHostname(other.getSourceHostname());
}
if (other.hasSourcePort()) {
setSourcePort(other.getSourcePort());
}
if (other.hasSourceTarget()) {
setSourceTarget(other.getSourceTarget());
}
if (other.hasSourceUuid()) {
setSourceUuid(other.getSourceUuid());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -481,6 +549,22 @@ public final class RemoteProtocol {
setIsEscaped(input.readBool());
break;
}
case 106: {
setSourceHostname(input.readString());
break;
}
case 112: {
setSourcePort(input.readUInt32());
break;
}
case 122: {
setSourceTarget(input.readString());
break;
}
case 130: {
setSourceUuid(input.readString());
break;
}
}
}
}
@ -719,6 +803,87 @@ public final class RemoteProtocol {
result.isEscaped_ = false;
return this;
}
// optional string sourceHostname = 13;
public boolean hasSourceHostname() {
return result.hasSourceHostname();
}
public java.lang.String getSourceHostname() {
return result.getSourceHostname();
}
public Builder setSourceHostname(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
result.hasSourceHostname = true;
result.sourceHostname_ = value;
return this;
}
public Builder clearSourceHostname() {
result.hasSourceHostname = false;
result.sourceHostname_ = getDefaultInstance().getSourceHostname();
return this;
}
// optional uint32 sourcePort = 14;
public boolean hasSourcePort() {
return result.hasSourcePort();
}
public int getSourcePort() {
return result.getSourcePort();
}
public Builder setSourcePort(int value) {
result.hasSourcePort = true;
result.sourcePort_ = value;
return this;
}
public Builder clearSourcePort() {
result.hasSourcePort = false;
result.sourcePort_ = 0;
return this;
}
// optional string sourceTarget = 15;
public boolean hasSourceTarget() {
return result.hasSourceTarget();
}
public java.lang.String getSourceTarget() {
return result.getSourceTarget();
}
public Builder setSourceTarget(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
result.hasSourceTarget = true;
result.sourceTarget_ = value;
return this;
}
public Builder clearSourceTarget() {
result.hasSourceTarget = false;
result.sourceTarget_ = getDefaultInstance().getSourceTarget();
return this;
}
// optional string sourceUuid = 16;
public boolean hasSourceUuid() {
return result.hasSourceUuid();
}
public java.lang.String getSourceUuid() {
return result.getSourceUuid();
}
public Builder setSourceUuid(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
result.hasSourceUuid = true;
result.sourceUuid_ = value;
return this;
}
public Builder clearSourceUuid() {
result.hasSourceUuid = false;
result.sourceUuid_ = getDefaultInstance().getSourceUuid();
return this;
}
}
static {
@ -1306,17 +1471,19 @@ public final class RemoteProtocol {
java.lang.String[] descriptorData = {
"\n;se/scalablesolutions/akka/nio/protobuf" +
"/RemoteProtocol.proto\022&se.scalablesoluti" +
"ons.akka.nio.protobuf\"\344\001\n\rRemoteRequest\022" +
"ons.akka.nio.protobuf\"\272\002\n\rRemoteRequest\022" +
"\n\n\002id\030\001 \002(\004\022\020\n\010protocol\030\002 \002(\r\022\017\n\007message" +
"\030\003 \002(\014\022\027\n\017messageManifest\030\004 \001(\014\022\016\n\006metho" +
"d\030\005 \001(\t\022\016\n\006target\030\006 \002(\t\022\014\n\004uuid\030\007 \002(\t\022\017\n" +
"\007timeout\030\010 \002(\004\022\026\n\016supervisorUuid\030\t \001(\t\022\017" +
"\n\007isActor\030\n \002(\010\022\020\n\010isOneWay\030\013 \002(\010\022\021\n\tisE" +
"scaped\030\014 \002(\010\"\247\001\n\013RemoteReply\022\n\n\002id\030\001 \002(\004" +
"\022\020\n\010protocol\030\002 \001(\r\022\017\n\007message\030\003 \001(\014\022\027\n\017m",
"essageManifest\030\004 \001(\014\022\021\n\texception\030\005 \001(\t\022" +
"\026\n\016supervisorUuid\030\006 \001(\t\022\017\n\007isActor\030\007 \002(\010" +
"\022\024\n\014isSuccessful\030\010 \002(\010"
"scaped\030\014 \002(\010\022\026\n\016sourceHostname\030\r \001(\t\022\022\n\n" +
"sourcePort\030\016 \001(\r\022\024\n\014sourceTarget\030\017 \001(\t\022\022",
"\n\nsourceUuid\030\020 \001(\t\"\247\001\n\013RemoteReply\022\n\n\002id" +
"\030\001 \002(\004\022\020\n\010protocol\030\002 \001(\r\022\017\n\007message\030\003 \001(" +
"\014\022\027\n\017messageManifest\030\004 \001(\014\022\021\n\texception\030" +
"\005 \001(\t\022\026\n\016supervisorUuid\030\006 \001(\t\022\017\n\007isActor" +
"\030\007 \002(\010\022\024\n\014isSuccessful\030\010 \002(\010"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -1328,7 +1495,7 @@ public final class RemoteProtocol {
internal_static_se_scalablesolutions_akka_nio_protobuf_RemoteRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_se_scalablesolutions_akka_nio_protobuf_RemoteRequest_descriptor,
new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", },
new java.lang.String[] { "Id", "Protocol", "Message", "MessageManifest", "Method", "Target", "Uuid", "Timeout", "SupervisorUuid", "IsActor", "IsOneWay", "IsEscaped", "SourceHostname", "SourcePort", "SourceTarget", "SourceUuid", },
se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest.class,
se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest.Builder.class);
internal_static_se_scalablesolutions_akka_nio_protobuf_RemoteReply_descriptor =

View file

@ -23,6 +23,10 @@ message RemoteRequest {
required bool isActor = 10;
required bool isOneWay = 11;
required bool isEscaped = 12;
optional string sourceHostname = 13;
optional uint32 sourcePort = 14;
optional string sourceTarget = 15;
optional string sourceUuid = 16;
}
message RemoteReply {