From 37ba03eadbe63096f3ab2b6105ca2b227d63647b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 3 Nov 2011 18:33:57 +0100 Subject: [PATCH] Adding initial support in the protocol to get the public host/port of the connecting remote server --- .../scala/akka/remote/RemoteInterface.scala | 16 +- .../scala/akka/util/ReflectiveAccess.scala | 4 - .../main/java/akka/remote/RemoteProtocol.java | 820 +++++++++++++++--- .../src/main/protocol/RemoteProtocol.proto | 10 +- .../src/main/scala/akka/remote/Remote.scala | 2 +- .../akka/remote/RemoteActorRefProvider.scala | 6 +- .../remote/netty/NettyRemoteSupport.scala | 24 +- 7 files changed, 748 insertions(+), 134 deletions(-) diff --git a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala index e32421e989..797ea7c1aa 100644 --- a/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remote/RemoteInterface.scala @@ -21,10 +21,6 @@ import java.lang.reflect.InvocationTargetException class RemoteException(message: String) extends AkkaException(message) -trait RemoteService { - def server: RemoteSupport -} - trait RemoteModule { protected[akka] def notifyListeners(message: ⇒ Any): Unit } @@ -174,9 +170,9 @@ trait RemoteClientModule extends RemoteModule { self: RemoteSupport ⇒ /** Methods that needs to be implemented by a transport **/ - protected[akka] def send[T](message: Any, - senderOption: Option[ActorRef], - remoteAddress: InetSocketAddress, - recipient: ActorRef, - loader: Option[ClassLoader]): Unit -} + protected[akka] def send(message: Any, + senderOption: Option[ActorRef], + remoteAddress: InetSocketAddress, + recipient: ActorRef, + loader: Option[ClassLoader]): Unit +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index a1e05cc819..f507b08ba6 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -4,15 +4,11 @@ package akka.util import akka.dispatch.Envelope -import akka.config.ModuleNotAvailableException import akka.actor._ import DeploymentConfig.ReplicationScheme import akka.config.ModuleNotAvailableException -import akka.event.EventHandler import akka.cluster.ClusterNode -import akka.remote.{ RemoteSupport, RemoteService } import akka.routing.{ RoutedProps, Router } -import java.net.InetSocketAddress import akka.AkkaApplication object ReflectiveAccess { diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java index dec400def6..1efeb0e27f 100644 --- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java @@ -2107,13 +2107,18 @@ public final class RemoteProtocol { public interface RemoteControlProtocolOrBuilder extends com.google.protobuf.MessageOrBuilder { - // optional string cookie = 1; + // required .CommandType commandType = 1; + boolean hasCommandType(); + akka.remote.RemoteProtocol.CommandType getCommandType(); + + // optional string cookie = 2; boolean hasCookie(); String getCookie(); - // required .CommandType commandType = 2; - boolean hasCommandType(); - akka.remote.RemoteProtocol.CommandType getCommandType(); + // optional .Endpoint origin = 3; + boolean hasOrigin(); + akka.remote.RemoteProtocol.Endpoint getOrigin(); + akka.remote.RemoteProtocol.EndpointOrBuilder getOriginOrBuilder(); } public static final class RemoteControlProtocol extends com.google.protobuf.GeneratedMessage @@ -2144,11 +2149,21 @@ public final class RemoteProtocol { } private int bitField0_; - // optional string cookie = 1; - public static final int COOKIE_FIELD_NUMBER = 1; + // required .CommandType commandType = 1; + public static final int COMMANDTYPE_FIELD_NUMBER = 1; + private akka.remote.RemoteProtocol.CommandType commandType_; + public boolean hasCommandType() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public akka.remote.RemoteProtocol.CommandType getCommandType() { + return commandType_; + } + + // optional string cookie = 2; + public static final int COOKIE_FIELD_NUMBER = 2; private java.lang.Object cookie_; public boolean hasCookie() { - return ((bitField0_ & 0x00000001) == 0x00000001); + return ((bitField0_ & 0x00000002) == 0x00000002); } public String getCookie() { java.lang.Object ref = cookie_; @@ -2176,19 +2191,23 @@ public final class RemoteProtocol { } } - // required .CommandType commandType = 2; - public static final int COMMANDTYPE_FIELD_NUMBER = 2; - private akka.remote.RemoteProtocol.CommandType commandType_; - public boolean hasCommandType() { - return ((bitField0_ & 0x00000002) == 0x00000002); + // optional .Endpoint origin = 3; + public static final int ORIGIN_FIELD_NUMBER = 3; + private akka.remote.RemoteProtocol.Endpoint origin_; + public boolean hasOrigin() { + return ((bitField0_ & 0x00000004) == 0x00000004); } - public akka.remote.RemoteProtocol.CommandType getCommandType() { - return commandType_; + public akka.remote.RemoteProtocol.Endpoint getOrigin() { + return origin_; + } + public akka.remote.RemoteProtocol.EndpointOrBuilder getOriginOrBuilder() { + return origin_; } private void initFields() { - cookie_ = ""; commandType_ = akka.remote.RemoteProtocol.CommandType.CONNECT; + cookie_ = ""; + origin_ = akka.remote.RemoteProtocol.Endpoint.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -2199,6 +2218,12 @@ public final class RemoteProtocol { memoizedIsInitialized = 0; return false; } + if (hasOrigin()) { + if (!getOrigin().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -2207,10 +2232,13 @@ public final class RemoteProtocol { throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeBytes(1, getCookieBytes()); + output.writeEnum(1, commandType_.getNumber()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeEnum(2, commandType_.getNumber()); + output.writeBytes(2, getCookieBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeMessage(3, origin_); } getUnknownFields().writeTo(output); } @@ -2223,11 +2251,15 @@ public final class RemoteProtocol { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeBytesSize(1, getCookieBytes()); + .computeEnumSize(1, commandType_.getNumber()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream - .computeEnumSize(2, commandType_.getNumber()); + .computeBytesSize(2, getCookieBytes()); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(3, origin_); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -2345,6 +2377,7 @@ public final class RemoteProtocol { } private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getOriginFieldBuilder(); } } private static Builder create() { @@ -2353,10 +2386,16 @@ public final class RemoteProtocol { public Builder clear() { super.clear(); - cookie_ = ""; - bitField0_ = (bitField0_ & ~0x00000001); commandType_ = akka.remote.RemoteProtocol.CommandType.CONNECT; + bitField0_ = (bitField0_ & ~0x00000001); + cookie_ = ""; bitField0_ = (bitField0_ & ~0x00000002); + if (originBuilder_ == null) { + origin_ = akka.remote.RemoteProtocol.Endpoint.getDefaultInstance(); + } else { + originBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -2398,11 +2437,19 @@ public final class RemoteProtocol { if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } - result.cookie_ = cookie_; + result.commandType_ = commandType_; if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } - result.commandType_ = commandType_; + result.cookie_ = cookie_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + if (originBuilder_ == null) { + result.origin_ = origin_; + } else { + result.origin_ = originBuilder_.build(); + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -2419,11 +2466,14 @@ public final class RemoteProtocol { public Builder mergeFrom(akka.remote.RemoteProtocol.RemoteControlProtocol other) { if (other == akka.remote.RemoteProtocol.RemoteControlProtocol.getDefaultInstance()) return this; + if (other.hasCommandType()) { + setCommandType(other.getCommandType()); + } if (other.hasCookie()) { setCookie(other.getCookie()); } - if (other.hasCommandType()) { - setCommandType(other.getCommandType()); + if (other.hasOrigin()) { + mergeOrigin(other.getOrigin()); } this.mergeUnknownFields(other.getUnknownFields()); return this; @@ -2434,6 +2484,568 @@ public final class RemoteProtocol { return false; } + if (hasOrigin()) { + if (!getOrigin().isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + onChanged(); + return this; + } + break; + } + case 8: { + int rawValue = input.readEnum(); + akka.remote.RemoteProtocol.CommandType value = akka.remote.RemoteProtocol.CommandType.valueOf(rawValue); + if (value == null) { + unknownFields.mergeVarintField(1, rawValue); + } else { + bitField0_ |= 0x00000001; + commandType_ = value; + } + break; + } + case 18: { + bitField0_ |= 0x00000002; + cookie_ = input.readBytes(); + break; + } + case 26: { + akka.remote.RemoteProtocol.Endpoint.Builder subBuilder = akka.remote.RemoteProtocol.Endpoint.newBuilder(); + if (hasOrigin()) { + subBuilder.mergeFrom(getOrigin()); + } + input.readMessage(subBuilder, extensionRegistry); + setOrigin(subBuilder.buildPartial()); + break; + } + } + } + } + + private int bitField0_; + + // required .CommandType commandType = 1; + private akka.remote.RemoteProtocol.CommandType commandType_ = akka.remote.RemoteProtocol.CommandType.CONNECT; + public boolean hasCommandType() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public akka.remote.RemoteProtocol.CommandType getCommandType() { + return commandType_; + } + public Builder setCommandType(akka.remote.RemoteProtocol.CommandType value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + commandType_ = value; + onChanged(); + return this; + } + public Builder clearCommandType() { + bitField0_ = (bitField0_ & ~0x00000001); + commandType_ = akka.remote.RemoteProtocol.CommandType.CONNECT; + onChanged(); + return this; + } + + // optional string cookie = 2; + private java.lang.Object cookie_ = ""; + public boolean hasCookie() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public String getCookie() { + java.lang.Object ref = cookie_; + if (!(ref instanceof String)) { + String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); + cookie_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setCookie(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + cookie_ = value; + onChanged(); + return this; + } + public Builder clearCookie() { + bitField0_ = (bitField0_ & ~0x00000002); + cookie_ = getDefaultInstance().getCookie(); + onChanged(); + return this; + } + void setCookie(com.google.protobuf.ByteString value) { + bitField0_ |= 0x00000002; + cookie_ = value; + onChanged(); + } + + // optional .Endpoint origin = 3; + private akka.remote.RemoteProtocol.Endpoint origin_ = akka.remote.RemoteProtocol.Endpoint.getDefaultInstance(); + private com.google.protobuf.SingleFieldBuilder< + akka.remote.RemoteProtocol.Endpoint, akka.remote.RemoteProtocol.Endpoint.Builder, akka.remote.RemoteProtocol.EndpointOrBuilder> originBuilder_; + public boolean hasOrigin() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public akka.remote.RemoteProtocol.Endpoint getOrigin() { + if (originBuilder_ == null) { + return origin_; + } else { + return originBuilder_.getMessage(); + } + } + public Builder setOrigin(akka.remote.RemoteProtocol.Endpoint value) { + if (originBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + origin_ = value; + onChanged(); + } else { + originBuilder_.setMessage(value); + } + bitField0_ |= 0x00000004; + return this; + } + public Builder setOrigin( + akka.remote.RemoteProtocol.Endpoint.Builder builderForValue) { + if (originBuilder_ == null) { + origin_ = builderForValue.build(); + onChanged(); + } else { + originBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000004; + return this; + } + public Builder mergeOrigin(akka.remote.RemoteProtocol.Endpoint value) { + if (originBuilder_ == null) { + if (((bitField0_ & 0x00000004) == 0x00000004) && + origin_ != akka.remote.RemoteProtocol.Endpoint.getDefaultInstance()) { + origin_ = + akka.remote.RemoteProtocol.Endpoint.newBuilder(origin_).mergeFrom(value).buildPartial(); + } else { + origin_ = value; + } + onChanged(); + } else { + originBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000004; + return this; + } + public Builder clearOrigin() { + if (originBuilder_ == null) { + origin_ = akka.remote.RemoteProtocol.Endpoint.getDefaultInstance(); + onChanged(); + } else { + originBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + public akka.remote.RemoteProtocol.Endpoint.Builder getOriginBuilder() { + bitField0_ |= 0x00000004; + onChanged(); + return getOriginFieldBuilder().getBuilder(); + } + public akka.remote.RemoteProtocol.EndpointOrBuilder getOriginOrBuilder() { + if (originBuilder_ != null) { + return originBuilder_.getMessageOrBuilder(); + } else { + return origin_; + } + } + private com.google.protobuf.SingleFieldBuilder< + akka.remote.RemoteProtocol.Endpoint, akka.remote.RemoteProtocol.Endpoint.Builder, akka.remote.RemoteProtocol.EndpointOrBuilder> + getOriginFieldBuilder() { + if (originBuilder_ == null) { + originBuilder_ = new com.google.protobuf.SingleFieldBuilder< + akka.remote.RemoteProtocol.Endpoint, akka.remote.RemoteProtocol.Endpoint.Builder, akka.remote.RemoteProtocol.EndpointOrBuilder>( + origin_, + getParentForChildren(), + isClean()); + origin_ = null; + } + return originBuilder_; + } + + // @@protoc_insertion_point(builder_scope:RemoteControlProtocol) + } + + static { + defaultInstance = new RemoteControlProtocol(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:RemoteControlProtocol) + } + + public interface EndpointOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required string host = 1; + boolean hasHost(); + String getHost(); + + // required uint32 port = 2; + boolean hasPort(); + int getPort(); + } + public static final class Endpoint extends + com.google.protobuf.GeneratedMessage + implements EndpointOrBuilder { + // Use Endpoint.newBuilder() to construct. + private Endpoint(Builder builder) { + super(builder); + } + private Endpoint(boolean noInit) {} + + private static final Endpoint defaultInstance; + public static Endpoint getDefaultInstance() { + return defaultInstance; + } + + public Endpoint getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.remote.RemoteProtocol.internal_static_Endpoint_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.remote.RemoteProtocol.internal_static_Endpoint_fieldAccessorTable; + } + + private int bitField0_; + // required string host = 1; + public static final int HOST_FIELD_NUMBER = 1; + private java.lang.Object host_; + public boolean hasHost() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getHost() { + java.lang.Object ref = host_; + if (ref instanceof String) { + return (String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + String s = bs.toStringUtf8(); + if (com.google.protobuf.Internal.isValidUtf8(bs)) { + host_ = s; + } + return s; + } + } + private com.google.protobuf.ByteString getHostBytes() { + java.lang.Object ref = host_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8((String) ref); + host_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // required uint32 port = 2; + public static final int PORT_FIELD_NUMBER = 2; + private int port_; + public boolean hasPort() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public int getPort() { + return port_; + } + + private void initFields() { + host_ = ""; + port_ = 0; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasHost()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasPort()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getHostBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeUInt32(2, port_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getHostBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeUInt32Size(2, port_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.remote.RemoteProtocol.Endpoint parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static akka.remote.RemoteProtocol.Endpoint parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static akka.remote.RemoteProtocol.Endpoint parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static akka.remote.RemoteProtocol.Endpoint parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static akka.remote.RemoteProtocol.Endpoint parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static akka.remote.RemoteProtocol.Endpoint parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static akka.remote.RemoteProtocol.Endpoint parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static akka.remote.RemoteProtocol.Endpoint parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static akka.remote.RemoteProtocol.Endpoint parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static akka.remote.RemoteProtocol.Endpoint parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.remote.RemoteProtocol.Endpoint prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder + implements akka.remote.RemoteProtocol.EndpointOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.remote.RemoteProtocol.internal_static_Endpoint_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.remote.RemoteProtocol.internal_static_Endpoint_fieldAccessorTable; + } + + // Construct using akka.remote.RemoteProtocol.Endpoint.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder(BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + host_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + port_ = 0; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.remote.RemoteProtocol.Endpoint.getDescriptor(); + } + + public akka.remote.RemoteProtocol.Endpoint getDefaultInstanceForType() { + return akka.remote.RemoteProtocol.Endpoint.getDefaultInstance(); + } + + public akka.remote.RemoteProtocol.Endpoint build() { + akka.remote.RemoteProtocol.Endpoint result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private akka.remote.RemoteProtocol.Endpoint buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + akka.remote.RemoteProtocol.Endpoint result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public akka.remote.RemoteProtocol.Endpoint buildPartial() { + akka.remote.RemoteProtocol.Endpoint result = new akka.remote.RemoteProtocol.Endpoint(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.host_ = host_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.port_ = port_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof akka.remote.RemoteProtocol.Endpoint) { + return mergeFrom((akka.remote.RemoteProtocol.Endpoint)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.remote.RemoteProtocol.Endpoint other) { + if (other == akka.remote.RemoteProtocol.Endpoint.getDefaultInstance()) return this; + if (other.hasHost()) { + setHost(other.getHost()); + } + if (other.hasPort()) { + setPort(other.getPort()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasHost()) { + + return false; + } + if (!hasPort()) { + + return false; + } return true; } @@ -2462,18 +3074,12 @@ public final class RemoteProtocol { } case 10: { bitField0_ |= 0x00000001; - cookie_ = input.readBytes(); + host_ = input.readBytes(); break; } case 16: { - int rawValue = input.readEnum(); - akka.remote.RemoteProtocol.CommandType value = akka.remote.RemoteProtocol.CommandType.valueOf(rawValue); - if (value == null) { - unknownFields.mergeVarintField(2, rawValue); - } else { - bitField0_ |= 0x00000002; - commandType_ = value; - } + bitField0_ |= 0x00000002; + port_ = input.readUInt32(); break; } } @@ -2482,75 +3088,72 @@ public final class RemoteProtocol { private int bitField0_; - // optional string cookie = 1; - private java.lang.Object cookie_ = ""; - public boolean hasCookie() { + // required string host = 1; + private java.lang.Object host_ = ""; + public boolean hasHost() { return ((bitField0_ & 0x00000001) == 0x00000001); } - public String getCookie() { - java.lang.Object ref = cookie_; + public String getHost() { + java.lang.Object ref = host_; if (!(ref instanceof String)) { String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); - cookie_ = s; + host_ = s; return s; } else { return (String) ref; } } - public Builder setCookie(String value) { + public Builder setHost(String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000001; - cookie_ = value; + host_ = value; onChanged(); return this; } - public Builder clearCookie() { + public Builder clearHost() { bitField0_ = (bitField0_ & ~0x00000001); - cookie_ = getDefaultInstance().getCookie(); + host_ = getDefaultInstance().getHost(); onChanged(); return this; } - void setCookie(com.google.protobuf.ByteString value) { + void setHost(com.google.protobuf.ByteString value) { bitField0_ |= 0x00000001; - cookie_ = value; + host_ = value; onChanged(); } - // required .CommandType commandType = 2; - private akka.remote.RemoteProtocol.CommandType commandType_ = akka.remote.RemoteProtocol.CommandType.CONNECT; - public boolean hasCommandType() { + // required uint32 port = 2; + private int port_ ; + public boolean hasPort() { return ((bitField0_ & 0x00000002) == 0x00000002); } - public akka.remote.RemoteProtocol.CommandType getCommandType() { - return commandType_; + public int getPort() { + return port_; } - public Builder setCommandType(akka.remote.RemoteProtocol.CommandType value) { - if (value == null) { - throw new NullPointerException(); - } + public Builder setPort(int value) { bitField0_ |= 0x00000002; - commandType_ = value; + port_ = value; onChanged(); return this; } - public Builder clearCommandType() { + public Builder clearPort() { bitField0_ = (bitField0_ & ~0x00000002); - commandType_ = akka.remote.RemoteProtocol.CommandType.CONNECT; + port_ = 0; onChanged(); return this; } - // @@protoc_insertion_point(builder_scope:RemoteControlProtocol) + // @@protoc_insertion_point(builder_scope:Endpoint) } static { - defaultInstance = new RemoteControlProtocol(true); + defaultInstance = new Endpoint(true); defaultInstance.initFields(); } - // @@protoc_insertion_point(class_scope:RemoteControlProtocol) + // @@protoc_insertion_point(class_scope:Endpoint) } public interface ActorRefProtocolOrBuilder @@ -6649,6 +7252,11 @@ public final class RemoteProtocol { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_RemoteControlProtocol_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_Endpoint_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_Endpoint_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_ActorRefProtocol_descriptor; private static @@ -6706,37 +7314,39 @@ public final class RemoteProtocol { "ssage\030\002 \001(\0132\020.MessageProtocol\022%\n\texcepti" + "on\030\003 \001(\0132\022.ExceptionProtocol\022!\n\006sender\030\004" + " \001(\0132\021.ActorRefProtocol\022(\n\010metadata\030\005 \003(" + - "\0132\026.MetadataEntryProtocol\"J\n\025RemoteContr" + - "olProtocol\022\016\n\006cookie\030\001 \001(\t\022!\n\013commandTyp", - "e\030\002 \002(\0162\014.CommandType\"?\n\020ActorRefProtoco" + - "l\022\017\n\007address\030\001 \002(\t\022\014\n\004host\030\002 \002(\t\022\014\n\004port" + - "\030\003 \002(\r\";\n\017MessageProtocol\022\017\n\007message\030\001 \002" + - "(\014\022\027\n\017messageManifest\030\002 \001(\014\")\n\014UuidProto" + - "col\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Metada" + - "taEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 " + - "\002(\014\"1\n\017AddressProtocol\022\020\n\010hostname\030\001 \002(\t" + - "\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tc" + - "lassname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t\"\256\001\n!Remo" + - "teSystemDaemonMessageProtocol\0223\n\013message", - "Type\030\001 \002(\0162\036.RemoteSystemDaemonMessageTy" + - "pe\022\024\n\014actorAddress\030\002 \001(\t\022\017\n\007payload\030\003 \001(" + - "\014\022-\n\026replicateActorFromUuid\030\004 \001(\0132\r.Uuid" + - "Protocol\"y\n\035DurableMailboxMessageProtoco" + - "l\022$\n\trecipient\030\001 \002(\0132\021.ActorRefProtocol\022" + - "!\n\006sender\030\002 \001(\0132\021.ActorRefProtocol\022\017\n\007me" + - "ssage\030\003 \002(\014*(\n\013CommandType\022\013\n\007CONNECT\020\001\022" + - "\014\n\010SHUTDOWN\020\002*K\n\026ReplicationStorageType\022" + - "\r\n\tTRANSIENT\020\001\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tD" + - "ATA_GRID\020\003*>\n\027ReplicationStrategyType\022\021\n", - "\rWRITE_THROUGH\020\001\022\020\n\014WRITE_BEHIND\020\002*\241\002\n\035R" + - "emoteSystemDaemonMessageType\022\010\n\004STOP\020\001\022\007" + - "\n\003USE\020\002\022\013\n\007RELEASE\020\003\022\022\n\016MAKE_AVAILABLE\020\004" + - "\022\024\n\020MAKE_UNAVAILABLE\020\005\022\016\n\nDISCONNECT\020\006\022\r" + - "\n\tRECONNECT\020\007\022\n\n\006RESIGN\020\010\022\n\n\006GOSSIP\020\t\022\031\n" + - "\025FAIL_OVER_CONNECTIONS\020\024\022\026\n\022FUNCTION_FUN" + - "0_UNIT\020\025\022\025\n\021FUNCTION_FUN0_ANY\020\026\022\032\n\026FUNCT" + - "ION_FUN1_ARG_UNIT\020\027\022\031\n\025FUNCTION_FUN1_ARG" + - "_ANY\020\030B\017\n\013akka.remoteH\001" + "\0132\026.MetadataEntryProtocol\"e\n\025RemoteContr" + + "olProtocol\022!\n\013commandType\030\001 \002(\0162\014.Comman", + "dType\022\016\n\006cookie\030\002 \001(\t\022\031\n\006origin\030\003 \001(\0132\t." + + "Endpoint\"&\n\010Endpoint\022\014\n\004host\030\001 \002(\t\022\014\n\004po" + + "rt\030\002 \002(\r\"?\n\020ActorRefProtocol\022\017\n\007address\030" + + "\001 \002(\t\022\014\n\004host\030\002 \002(\t\022\014\n\004port\030\003 \002(\r\";\n\017Mes" + + "sageProtocol\022\017\n\007message\030\001 \002(\014\022\027\n\017message" + + "Manifest\030\002 \001(\014\")\n\014UuidProtocol\022\014\n\004high\030\001" + + " \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEntryProtoc" + + "ol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"1\n\017Addres" + + "sProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(" + + "\r\"7\n\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(", + "\t\022\017\n\007message\030\002 \002(\t\"\256\001\n!RemoteSystemDaemo" + + "nMessageProtocol\0223\n\013messageType\030\001 \002(\0162\036." + + "RemoteSystemDaemonMessageType\022\024\n\014actorAd" + + "dress\030\002 \001(\t\022\017\n\007payload\030\003 \001(\014\022-\n\026replicat" + + "eActorFromUuid\030\004 \001(\0132\r.UuidProtocol\"y\n\035D" + + "urableMailboxMessageProtocol\022$\n\trecipien" + + "t\030\001 \002(\0132\021.ActorRefProtocol\022!\n\006sender\030\002 \001" + + "(\0132\021.ActorRefProtocol\022\017\n\007message\030\003 \002(\014*(" + + "\n\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002" + + "*K\n\026ReplicationStorageType\022\r\n\tTRANSIENT\020", + "\001\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tDATA_GRID\020\003*>\n" + + "\027ReplicationStrategyType\022\021\n\rWRITE_THROUG" + + "H\020\001\022\020\n\014WRITE_BEHIND\020\002*\241\002\n\035RemoteSystemDa" + + "emonMessageType\022\010\n\004STOP\020\001\022\007\n\003USE\020\002\022\013\n\007RE" + + "LEASE\020\003\022\022\n\016MAKE_AVAILABLE\020\004\022\024\n\020MAKE_UNAV" + + "AILABLE\020\005\022\016\n\nDISCONNECT\020\006\022\r\n\tRECONNECT\020\007" + + "\022\n\n\006RESIGN\020\010\022\n\n\006GOSSIP\020\t\022\031\n\025FAIL_OVER_CO" + + "NNECTIONS\020\024\022\026\n\022FUNCTION_FUN0_UNIT\020\025\022\025\n\021F" + + "UNCTION_FUN0_ANY\020\026\022\032\n\026FUNCTION_FUN1_ARG_" + + "UNIT\020\027\022\031\n\025FUNCTION_FUN1_ARG_ANY\020\030B\017\n\013akk", + "a.remoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6764,11 +7374,19 @@ public final class RemoteProtocol { internal_static_RemoteControlProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteControlProtocol_descriptor, - new java.lang.String[] { "Cookie", "CommandType", }, + new java.lang.String[] { "CommandType", "Cookie", "Origin", }, akka.remote.RemoteProtocol.RemoteControlProtocol.class, akka.remote.RemoteProtocol.RemoteControlProtocol.Builder.class); - internal_static_ActorRefProtocol_descriptor = + internal_static_Endpoint_descriptor = getDescriptor().getMessageTypes().get(3); + internal_static_Endpoint_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_Endpoint_descriptor, + new java.lang.String[] { "Host", "Port", }, + akka.remote.RemoteProtocol.Endpoint.class, + akka.remote.RemoteProtocol.Endpoint.Builder.class); + internal_static_ActorRefProtocol_descriptor = + getDescriptor().getMessageTypes().get(4); internal_static_ActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ActorRefProtocol_descriptor, @@ -6776,7 +7394,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.ActorRefProtocol.class, akka.remote.RemoteProtocol.ActorRefProtocol.Builder.class); internal_static_MessageProtocol_descriptor = - getDescriptor().getMessageTypes().get(4); + getDescriptor().getMessageTypes().get(5); internal_static_MessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MessageProtocol_descriptor, @@ -6784,7 +7402,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.MessageProtocol.class, akka.remote.RemoteProtocol.MessageProtocol.Builder.class); internal_static_UuidProtocol_descriptor = - getDescriptor().getMessageTypes().get(5); + getDescriptor().getMessageTypes().get(6); internal_static_UuidProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_UuidProtocol_descriptor, @@ -6792,7 +7410,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.UuidProtocol.class, akka.remote.RemoteProtocol.UuidProtocol.Builder.class); internal_static_MetadataEntryProtocol_descriptor = - getDescriptor().getMessageTypes().get(6); + getDescriptor().getMessageTypes().get(7); internal_static_MetadataEntryProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MetadataEntryProtocol_descriptor, @@ -6800,7 +7418,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.MetadataEntryProtocol.class, akka.remote.RemoteProtocol.MetadataEntryProtocol.Builder.class); internal_static_AddressProtocol_descriptor = - getDescriptor().getMessageTypes().get(7); + getDescriptor().getMessageTypes().get(8); internal_static_AddressProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AddressProtocol_descriptor, @@ -6808,7 +7426,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.AddressProtocol.class, akka.remote.RemoteProtocol.AddressProtocol.Builder.class); internal_static_ExceptionProtocol_descriptor = - getDescriptor().getMessageTypes().get(8); + getDescriptor().getMessageTypes().get(9); internal_static_ExceptionProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ExceptionProtocol_descriptor, @@ -6816,7 +7434,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.ExceptionProtocol.class, akka.remote.RemoteProtocol.ExceptionProtocol.Builder.class); internal_static_RemoteSystemDaemonMessageProtocol_descriptor = - getDescriptor().getMessageTypes().get(9); + getDescriptor().getMessageTypes().get(10); internal_static_RemoteSystemDaemonMessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteSystemDaemonMessageProtocol_descriptor, @@ -6824,7 +7442,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.class, akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.Builder.class); internal_static_DurableMailboxMessageProtocol_descriptor = - getDescriptor().getMessageTypes().get(10); + getDescriptor().getMessageTypes().get(11); internal_static_DurableMailboxMessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_DurableMailboxMessageProtocol_descriptor, diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 8dda9e4a11..9d6ad23bd7 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -31,8 +31,14 @@ message RemoteMessageProtocol { * Defines some control messages for the remoting */ message RemoteControlProtocol { - optional string cookie = 1; - required CommandType commandType = 2; + required CommandType commandType = 1; + optional string cookie = 2; + optional Endpoint origin = 3; +} + +message Endpoint { + required string host = 1; + required uint32 port = 2; } /** diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 0d67a15985..50d1ebb87f 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -27,7 +27,7 @@ import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compressi * * @author Jonas Bonér */ -class Remote(val app: AkkaApplication) extends RemoteService { +class Remote(val app: AkkaApplication) { import app._ import app.config diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 84b2e5a71c..99c47e5285 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -251,9 +251,7 @@ private[akka] case class RemoteActorRef private[akka] ( protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported - def postMessageToMailbox(message: Any, sender: ActorRef) { - remote.send[Any](message, Option(sender), remoteAddress, this, loader) - } + def postMessageToMailbox(message: Any, sender: ActorRef): Unit = remote.send(message, Option(sender), remoteAddress, this, loader) def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout) @@ -265,7 +263,7 @@ private[akka] case class RemoteActorRef private[akka] ( synchronized { if (running) { running = false - remote.send[Any](new Terminate(), None, remoteAddress, this, loader) + remote.send(new Terminate(), None, remoteAddress, this, loader) } } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index d0e4c59efd..f70e479a11 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -40,12 +40,12 @@ trait NettyRemoteClientModule extends RemoteClientModule { def app: AkkaApplication - protected[akka] def send[T](message: Any, - senderOption: Option[ActorRef], - recipientAddress: InetSocketAddress, - recipient: ActorRef, - loader: Option[ClassLoader]): Unit = - withClientFor(recipientAddress, loader) { _.send[T](message, senderOption, recipient) } + protected[akka] def send(message: Any, + senderOption: Option[ActorRef], + recipientAddress: InetSocketAddress, + recipient: ActorRef, + loader: Option[ClassLoader]): Unit = + withClientFor(recipientAddress, loader) { _.send(message, senderOption, recipient) } private[akka] def withClientFor[T]( address: InetSocketAddress, loader: Option[ClassLoader])(body: RemoteClient ⇒ T): T = { @@ -140,14 +140,14 @@ abstract class RemoteClient private[akka] ( /** * Converts the message to the wireprotocol and sends the message across the wire */ - def send[T](message: Any, senderOption: Option[ActorRef], recipient: ActorRef) { + def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef) { send(createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build) } /** * Sends the message across the wire */ - def send[T](request: RemoteMessageProtocol) { + def send(request: RemoteMessageProtocol) { if (isRunning) { //TODO FIXME RACY app.eventHandler.debug(this, "Sending to connection [%s] message [%s]".format(remoteAddress, new RemoteMessage(request, remoteSupport))) @@ -210,6 +210,7 @@ class ActiveRemoteClient private[akka] ( def sendSecureCookie(connection: ChannelFuture) { val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) if (SECURE_COOKIE.nonEmpty) handshake.setCookie(SECURE_COOKIE.get) + handshake.setOrigin(RemoteProtocol.Endpoint.newBuilder().setHost(app.hostname).setPort(app.port).build) connection.getChannel.write(createControlEnvelope(handshake.build)) } @@ -353,9 +354,7 @@ class ActiveRemoteClientHandler( case arp: AkkaRemoteProtocol if arp.hasInstruction ⇒ val rcp = arp.getInstruction rcp.getCommandType match { - case CommandType.SHUTDOWN ⇒ akka.dispatch.Future { - client.module.shutdownClientConnection(remoteAddress) - } + case CommandType.SHUTDOWN ⇒ akka.dispatch.Future { client.module.shutdownClientConnection(remoteAddress) } } case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ @@ -570,7 +569,8 @@ class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends Si case `authenticated` ⇒ ctx.sendUpstream(event) case null ⇒ event.getMessage match { case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction ⇒ - remoteProtocol.getInstruction.getCookie match { + val instruction = remoteProtocol.getInstruction + instruction.getCookie match { case `cookie` ⇒ ctx.setAttachment(authenticated) ctx.sendUpstream(event)