From e958987e5bb4ca66bb36fb3ae55866970204bf17 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 3 Nov 2011 19:32:53 +0100 Subject: [PATCH] Switching to AddressProtocol for the remote origin address --- .../actor/LocalActorRefProviderSpec.scala | 14 +- .../scala/akka/actor/ActorRefProvider.scala | 1 + .../src/main/scala/akka/actor/Deployer.scala | 25 +- .../main/java/akka/remote/RemoteProtocol.java | 588 ++---------------- .../src/main/protocol/RemoteProtocol.proto | 7 +- .../remote/netty/NettyRemoteSupport.scala | 5 +- 6 files changed, 79 insertions(+), 561 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala index cb3758081a..b6f7196f81 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LocalActorRefProviderSpec.scala @@ -17,15 +17,11 @@ class LocalActorRefProviderSpec extends AkkaSpec { provider.isInstanceOf[LocalActorRefProvider] must be(true) - implicit val timeout = Timeout(30 seconds) - - val actors: Seq[Future[ActorRef]] = - (0 until 100) flatMap { i ⇒ // 100 concurrent runs - val address = "new-actor" + i - (1 to 4) map { _ ⇒ Future { provider.actorOf(Props(c ⇒ { case _ ⇒ }), app.guardian, address) } } - } - - actors.map(_.get).distinct.size must be(100) + (0 until 100) foreach { i ⇒ // 100 concurrent runs + val address = "new-actor" + i + implicit val timeout = Timeout(30 seconds) + ((1 to 4) map { _ ⇒ Future { provider.actorOf(Props(c ⇒ { case _ ⇒ }), app.guardian, address, true) } }).map(_.get).distinct.size must be(1) + } } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 5c05fd867b..973ed36ef6 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -160,6 +160,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { // create a routed actor ref case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope)) ⇒ + val routerFactory: () ⇒ Router = DeploymentConfig.routerTypeFor(routerType) match { case RouterType.Direct ⇒ () ⇒ new DirectRouter case RouterType.Random ⇒ () ⇒ new RandomRouter diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index bee5027839..e900893604 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -36,8 +36,6 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { val deploymentConfig = new DeploymentConfig(app) - // val defaultAddress = Node(Config.nodename) - lazy val instance: ActorDeployer = { val deployer = if (app.reflective.ClusterModule.isEnabled) app.reflective.ClusterModule.clusterDeployer else LocalDeployer deployer.init(deploymentsInConfig) @@ -74,22 +72,13 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer { } private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = { - val deployment_? = instance.lookupDeploymentFor(address) - - if (deployment_?.isDefined && (deployment_?.get ne null)) deployment_? - else { - val newDeployment = try { - lookupInConfig(address) - } catch { - case e: ConfigurationException ⇒ - app.eventHandler.error(e, this, e.getMessage) //TODO FIXME I do not condone log AND rethrow - throw e - } - - newDeployment match { - case None | Some(null) ⇒ None - case Some(d) ⇒ deploy(d); newDeployment // deploy and cache it - } + instance.lookupDeploymentFor(address) match { + case s @ Some(d) if d ne null ⇒ s + case _ ⇒ + lookupInConfig(address) match { + case None | Some(null) ⇒ None + case s @ Some(d) ⇒ deploy(d); s // deploy and cache it + } } } diff --git a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java index 1efeb0e27f..8d1d7f8c94 100644 --- a/akka-remote/src/main/java/akka/remote/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/RemoteProtocol.java @@ -2115,10 +2115,10 @@ public final class RemoteProtocol { boolean hasCookie(); String getCookie(); - // optional .Endpoint origin = 3; + // optional .AddressProtocol origin = 3; boolean hasOrigin(); - akka.remote.RemoteProtocol.Endpoint getOrigin(); - akka.remote.RemoteProtocol.EndpointOrBuilder getOriginOrBuilder(); + akka.remote.RemoteProtocol.AddressProtocol getOrigin(); + akka.remote.RemoteProtocol.AddressProtocolOrBuilder getOriginOrBuilder(); } public static final class RemoteControlProtocol extends com.google.protobuf.GeneratedMessage @@ -2191,23 +2191,23 @@ public final class RemoteProtocol { } } - // optional .Endpoint origin = 3; + // optional .AddressProtocol origin = 3; public static final int ORIGIN_FIELD_NUMBER = 3; - private akka.remote.RemoteProtocol.Endpoint origin_; + private akka.remote.RemoteProtocol.AddressProtocol origin_; public boolean hasOrigin() { return ((bitField0_ & 0x00000004) == 0x00000004); } - public akka.remote.RemoteProtocol.Endpoint getOrigin() { + public akka.remote.RemoteProtocol.AddressProtocol getOrigin() { return origin_; } - public akka.remote.RemoteProtocol.EndpointOrBuilder getOriginOrBuilder() { + public akka.remote.RemoteProtocol.AddressProtocolOrBuilder getOriginOrBuilder() { return origin_; } private void initFields() { commandType_ = akka.remote.RemoteProtocol.CommandType.CONNECT; cookie_ = ""; - origin_ = akka.remote.RemoteProtocol.Endpoint.getDefaultInstance(); + origin_ = akka.remote.RemoteProtocol.AddressProtocol.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -2391,7 +2391,7 @@ public final class RemoteProtocol { cookie_ = ""; bitField0_ = (bitField0_ & ~0x00000002); if (originBuilder_ == null) { - origin_ = akka.remote.RemoteProtocol.Endpoint.getDefaultInstance(); + origin_ = akka.remote.RemoteProtocol.AddressProtocol.getDefaultInstance(); } else { originBuilder_.clear(); } @@ -2533,7 +2533,7 @@ public final class RemoteProtocol { break; } case 26: { - akka.remote.RemoteProtocol.Endpoint.Builder subBuilder = akka.remote.RemoteProtocol.Endpoint.newBuilder(); + akka.remote.RemoteProtocol.AddressProtocol.Builder subBuilder = akka.remote.RemoteProtocol.AddressProtocol.newBuilder(); if (hasOrigin()) { subBuilder.mergeFrom(getOrigin()); } @@ -2607,21 +2607,21 @@ public final class RemoteProtocol { onChanged(); } - // optional .Endpoint origin = 3; - private akka.remote.RemoteProtocol.Endpoint origin_ = akka.remote.RemoteProtocol.Endpoint.getDefaultInstance(); + // optional .AddressProtocol origin = 3; + private akka.remote.RemoteProtocol.AddressProtocol origin_ = akka.remote.RemoteProtocol.AddressProtocol.getDefaultInstance(); private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.Endpoint, akka.remote.RemoteProtocol.Endpoint.Builder, akka.remote.RemoteProtocol.EndpointOrBuilder> originBuilder_; + akka.remote.RemoteProtocol.AddressProtocol, akka.remote.RemoteProtocol.AddressProtocol.Builder, akka.remote.RemoteProtocol.AddressProtocolOrBuilder> originBuilder_; public boolean hasOrigin() { return ((bitField0_ & 0x00000004) == 0x00000004); } - public akka.remote.RemoteProtocol.Endpoint getOrigin() { + public akka.remote.RemoteProtocol.AddressProtocol getOrigin() { if (originBuilder_ == null) { return origin_; } else { return originBuilder_.getMessage(); } } - public Builder setOrigin(akka.remote.RemoteProtocol.Endpoint value) { + public Builder setOrigin(akka.remote.RemoteProtocol.AddressProtocol value) { if (originBuilder_ == null) { if (value == null) { throw new NullPointerException(); @@ -2635,7 +2635,7 @@ public final class RemoteProtocol { return this; } public Builder setOrigin( - akka.remote.RemoteProtocol.Endpoint.Builder builderForValue) { + akka.remote.RemoteProtocol.AddressProtocol.Builder builderForValue) { if (originBuilder_ == null) { origin_ = builderForValue.build(); onChanged(); @@ -2645,12 +2645,12 @@ public final class RemoteProtocol { bitField0_ |= 0x00000004; return this; } - public Builder mergeOrigin(akka.remote.RemoteProtocol.Endpoint value) { + public Builder mergeOrigin(akka.remote.RemoteProtocol.AddressProtocol value) { if (originBuilder_ == null) { if (((bitField0_ & 0x00000004) == 0x00000004) && - origin_ != akka.remote.RemoteProtocol.Endpoint.getDefaultInstance()) { + origin_ != akka.remote.RemoteProtocol.AddressProtocol.getDefaultInstance()) { origin_ = - akka.remote.RemoteProtocol.Endpoint.newBuilder(origin_).mergeFrom(value).buildPartial(); + akka.remote.RemoteProtocol.AddressProtocol.newBuilder(origin_).mergeFrom(value).buildPartial(); } else { origin_ = value; } @@ -2663,7 +2663,7 @@ public final class RemoteProtocol { } public Builder clearOrigin() { if (originBuilder_ == null) { - origin_ = akka.remote.RemoteProtocol.Endpoint.getDefaultInstance(); + origin_ = akka.remote.RemoteProtocol.AddressProtocol.getDefaultInstance(); onChanged(); } else { originBuilder_.clear(); @@ -2671,12 +2671,12 @@ public final class RemoteProtocol { bitField0_ = (bitField0_ & ~0x00000004); return this; } - public akka.remote.RemoteProtocol.Endpoint.Builder getOriginBuilder() { + public akka.remote.RemoteProtocol.AddressProtocol.Builder getOriginBuilder() { bitField0_ |= 0x00000004; onChanged(); return getOriginFieldBuilder().getBuilder(); } - public akka.remote.RemoteProtocol.EndpointOrBuilder getOriginOrBuilder() { + public akka.remote.RemoteProtocol.AddressProtocolOrBuilder getOriginOrBuilder() { if (originBuilder_ != null) { return originBuilder_.getMessageOrBuilder(); } else { @@ -2684,11 +2684,11 @@ public final class RemoteProtocol { } } private com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.Endpoint, akka.remote.RemoteProtocol.Endpoint.Builder, akka.remote.RemoteProtocol.EndpointOrBuilder> + akka.remote.RemoteProtocol.AddressProtocol, akka.remote.RemoteProtocol.AddressProtocol.Builder, akka.remote.RemoteProtocol.AddressProtocolOrBuilder> getOriginFieldBuilder() { if (originBuilder_ == null) { originBuilder_ = new com.google.protobuf.SingleFieldBuilder< - akka.remote.RemoteProtocol.Endpoint, akka.remote.RemoteProtocol.Endpoint.Builder, akka.remote.RemoteProtocol.EndpointOrBuilder>( + akka.remote.RemoteProtocol.AddressProtocol, akka.remote.RemoteProtocol.AddressProtocol.Builder, akka.remote.RemoteProtocol.AddressProtocolOrBuilder>( origin_, getParentForChildren(), isClean()); @@ -2708,454 +2708,6 @@ public final class RemoteProtocol { // @@protoc_insertion_point(class_scope:RemoteControlProtocol) } - public interface EndpointOrBuilder - extends com.google.protobuf.MessageOrBuilder { - - // required string host = 1; - boolean hasHost(); - String getHost(); - - // required uint32 port = 2; - boolean hasPort(); - int getPort(); - } - public static final class Endpoint extends - com.google.protobuf.GeneratedMessage - implements EndpointOrBuilder { - // Use Endpoint.newBuilder() to construct. - private Endpoint(Builder builder) { - super(builder); - } - private Endpoint(boolean noInit) {} - - private static final Endpoint defaultInstance; - public static Endpoint getDefaultInstance() { - return defaultInstance; - } - - public Endpoint getDefaultInstanceForType() { - return defaultInstance; - } - - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return akka.remote.RemoteProtocol.internal_static_Endpoint_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return akka.remote.RemoteProtocol.internal_static_Endpoint_fieldAccessorTable; - } - - private int bitField0_; - // required string host = 1; - public static final int HOST_FIELD_NUMBER = 1; - private java.lang.Object host_; - public boolean hasHost() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public String getHost() { - java.lang.Object ref = host_; - if (ref instanceof String) { - return (String) ref; - } else { - com.google.protobuf.ByteString bs = - (com.google.protobuf.ByteString) ref; - String s = bs.toStringUtf8(); - if (com.google.protobuf.Internal.isValidUtf8(bs)) { - host_ = s; - } - return s; - } - } - private com.google.protobuf.ByteString getHostBytes() { - java.lang.Object ref = host_; - if (ref instanceof String) { - com.google.protobuf.ByteString b = - com.google.protobuf.ByteString.copyFromUtf8((String) ref); - host_ = b; - return b; - } else { - return (com.google.protobuf.ByteString) ref; - } - } - - // required uint32 port = 2; - public static final int PORT_FIELD_NUMBER = 2; - private int port_; - public boolean hasPort() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public int getPort() { - return port_; - } - - private void initFields() { - host_ = ""; - port_ = 0; - } - private byte memoizedIsInitialized = -1; - public final boolean isInitialized() { - byte isInitialized = memoizedIsInitialized; - if (isInitialized != -1) return isInitialized == 1; - - if (!hasHost()) { - memoizedIsInitialized = 0; - return false; - } - if (!hasPort()) { - memoizedIsInitialized = 0; - return false; - } - memoizedIsInitialized = 1; - return true; - } - - public void writeTo(com.google.protobuf.CodedOutputStream output) - throws java.io.IOException { - getSerializedSize(); - if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeBytes(1, getHostBytes()); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeUInt32(2, port_); - } - getUnknownFields().writeTo(output); - } - - private int memoizedSerializedSize = -1; - public int getSerializedSize() { - int size = memoizedSerializedSize; - if (size != -1) return size; - - size = 0; - if (((bitField0_ & 0x00000001) == 0x00000001)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(1, getHostBytes()); - } - if (((bitField0_ & 0x00000002) == 0x00000002)) { - size += com.google.protobuf.CodedOutputStream - .computeUInt32Size(2, port_); - } - size += getUnknownFields().getSerializedSize(); - memoizedSerializedSize = size; - return size; - } - - private static final long serialVersionUID = 0L; - @java.lang.Override - protected java.lang.Object writeReplace() - throws java.io.ObjectStreamException { - return super.writeReplace(); - } - - public static akka.remote.RemoteProtocol.Endpoint parseFrom( - com.google.protobuf.ByteString data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static akka.remote.RemoteProtocol.Endpoint parseFrom( - com.google.protobuf.ByteString data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.Endpoint parseFrom(byte[] data) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data).buildParsed(); - } - public static akka.remote.RemoteProtocol.Endpoint parseFrom( - byte[] data, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws com.google.protobuf.InvalidProtocolBufferException { - return newBuilder().mergeFrom(data, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.Endpoint parseFrom(java.io.InputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static akka.remote.RemoteProtocol.Endpoint parseFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - public static akka.remote.RemoteProtocol.Endpoint parseDelimitedFrom(java.io.InputStream input) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static akka.remote.RemoteProtocol.Endpoint parseDelimitedFrom( - java.io.InputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - Builder builder = newBuilder(); - if (builder.mergeDelimitedFrom(input, extensionRegistry)) { - return builder.buildParsed(); - } else { - return null; - } - } - public static akka.remote.RemoteProtocol.Endpoint parseFrom( - com.google.protobuf.CodedInputStream input) - throws java.io.IOException { - return newBuilder().mergeFrom(input).buildParsed(); - } - public static akka.remote.RemoteProtocol.Endpoint parseFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - return newBuilder().mergeFrom(input, extensionRegistry) - .buildParsed(); - } - - public static Builder newBuilder() { return Builder.create(); } - public Builder newBuilderForType() { return newBuilder(); } - public static Builder newBuilder(akka.remote.RemoteProtocol.Endpoint prototype) { - return newBuilder().mergeFrom(prototype); - } - public Builder toBuilder() { return newBuilder(this); } - - @java.lang.Override - protected Builder newBuilderForType( - com.google.protobuf.GeneratedMessage.BuilderParent parent) { - Builder builder = new Builder(parent); - return builder; - } - public static final class Builder extends - com.google.protobuf.GeneratedMessage.Builder - implements akka.remote.RemoteProtocol.EndpointOrBuilder { - public static final com.google.protobuf.Descriptors.Descriptor - getDescriptor() { - return akka.remote.RemoteProtocol.internal_static_Endpoint_descriptor; - } - - protected com.google.protobuf.GeneratedMessage.FieldAccessorTable - internalGetFieldAccessorTable() { - return akka.remote.RemoteProtocol.internal_static_Endpoint_fieldAccessorTable; - } - - // Construct using akka.remote.RemoteProtocol.Endpoint.newBuilder() - private Builder() { - maybeForceBuilderInitialization(); - } - - private Builder(BuilderParent parent) { - super(parent); - maybeForceBuilderInitialization(); - } - private void maybeForceBuilderInitialization() { - if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { - } - } - private static Builder create() { - return new Builder(); - } - - public Builder clear() { - super.clear(); - host_ = ""; - bitField0_ = (bitField0_ & ~0x00000001); - port_ = 0; - bitField0_ = (bitField0_ & ~0x00000002); - return this; - } - - public Builder clone() { - return create().mergeFrom(buildPartial()); - } - - public com.google.protobuf.Descriptors.Descriptor - getDescriptorForType() { - return akka.remote.RemoteProtocol.Endpoint.getDescriptor(); - } - - public akka.remote.RemoteProtocol.Endpoint getDefaultInstanceForType() { - return akka.remote.RemoteProtocol.Endpoint.getDefaultInstance(); - } - - public akka.remote.RemoteProtocol.Endpoint build() { - akka.remote.RemoteProtocol.Endpoint result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException(result); - } - return result; - } - - private akka.remote.RemoteProtocol.Endpoint buildParsed() - throws com.google.protobuf.InvalidProtocolBufferException { - akka.remote.RemoteProtocol.Endpoint result = buildPartial(); - if (!result.isInitialized()) { - throw newUninitializedMessageException( - result).asInvalidProtocolBufferException(); - } - return result; - } - - public akka.remote.RemoteProtocol.Endpoint buildPartial() { - akka.remote.RemoteProtocol.Endpoint result = new akka.remote.RemoteProtocol.Endpoint(this); - int from_bitField0_ = bitField0_; - int to_bitField0_ = 0; - if (((from_bitField0_ & 0x00000001) == 0x00000001)) { - to_bitField0_ |= 0x00000001; - } - result.host_ = host_; - if (((from_bitField0_ & 0x00000002) == 0x00000002)) { - to_bitField0_ |= 0x00000002; - } - result.port_ = port_; - result.bitField0_ = to_bitField0_; - onBuilt(); - return result; - } - - public Builder mergeFrom(com.google.protobuf.Message other) { - if (other instanceof akka.remote.RemoteProtocol.Endpoint) { - return mergeFrom((akka.remote.RemoteProtocol.Endpoint)other); - } else { - super.mergeFrom(other); - return this; - } - } - - public Builder mergeFrom(akka.remote.RemoteProtocol.Endpoint other) { - if (other == akka.remote.RemoteProtocol.Endpoint.getDefaultInstance()) return this; - if (other.hasHost()) { - setHost(other.getHost()); - } - if (other.hasPort()) { - setPort(other.getPort()); - } - this.mergeUnknownFields(other.getUnknownFields()); - return this; - } - - public final boolean isInitialized() { - if (!hasHost()) { - - return false; - } - if (!hasPort()) { - - return false; - } - return true; - } - - public Builder mergeFrom( - com.google.protobuf.CodedInputStream input, - com.google.protobuf.ExtensionRegistryLite extensionRegistry) - throws java.io.IOException { - com.google.protobuf.UnknownFieldSet.Builder unknownFields = - com.google.protobuf.UnknownFieldSet.newBuilder( - this.getUnknownFields()); - while (true) { - int tag = input.readTag(); - switch (tag) { - case 0: - this.setUnknownFields(unknownFields.build()); - onChanged(); - return this; - default: { - if (!parseUnknownField(input, unknownFields, - extensionRegistry, tag)) { - this.setUnknownFields(unknownFields.build()); - onChanged(); - return this; - } - break; - } - case 10: { - bitField0_ |= 0x00000001; - host_ = input.readBytes(); - break; - } - case 16: { - bitField0_ |= 0x00000002; - port_ = input.readUInt32(); - break; - } - } - } - } - - private int bitField0_; - - // required string host = 1; - private java.lang.Object host_ = ""; - public boolean hasHost() { - return ((bitField0_ & 0x00000001) == 0x00000001); - } - public String getHost() { - java.lang.Object ref = host_; - if (!(ref instanceof String)) { - String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); - host_ = s; - return s; - } else { - return (String) ref; - } - } - public Builder setHost(String value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000001; - host_ = value; - onChanged(); - return this; - } - public Builder clearHost() { - bitField0_ = (bitField0_ & ~0x00000001); - host_ = getDefaultInstance().getHost(); - onChanged(); - return this; - } - void setHost(com.google.protobuf.ByteString value) { - bitField0_ |= 0x00000001; - host_ = value; - onChanged(); - } - - // required uint32 port = 2; - private int port_ ; - public boolean hasPort() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public int getPort() { - return port_; - } - public Builder setPort(int value) { - bitField0_ |= 0x00000002; - port_ = value; - onChanged(); - return this; - } - public Builder clearPort() { - bitField0_ = (bitField0_ & ~0x00000002); - port_ = 0; - onChanged(); - return this; - } - - // @@protoc_insertion_point(builder_scope:Endpoint) - } - - static { - defaultInstance = new Endpoint(true); - defaultInstance.initFields(); - } - - // @@protoc_insertion_point(class_scope:Endpoint) - } - public interface ActorRefProtocolOrBuilder extends com.google.protobuf.MessageOrBuilder { @@ -7252,11 +6804,6 @@ public final class RemoteProtocol { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_RemoteControlProtocol_fieldAccessorTable; - private static com.google.protobuf.Descriptors.Descriptor - internal_static_Endpoint_descriptor; - private static - com.google.protobuf.GeneratedMessage.FieldAccessorTable - internal_static_Endpoint_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_ActorRefProtocol_descriptor; private static @@ -7314,39 +6861,38 @@ public final class RemoteProtocol { "ssage\030\002 \001(\0132\020.MessageProtocol\022%\n\texcepti" + "on\030\003 \001(\0132\022.ExceptionProtocol\022!\n\006sender\030\004" + " \001(\0132\021.ActorRefProtocol\022(\n\010metadata\030\005 \003(" + - "\0132\026.MetadataEntryProtocol\"e\n\025RemoteContr" + + "\0132\026.MetadataEntryProtocol\"l\n\025RemoteContr" + "olProtocol\022!\n\013commandType\030\001 \002(\0162\014.Comman", - "dType\022\016\n\006cookie\030\002 \001(\t\022\031\n\006origin\030\003 \001(\0132\t." + - "Endpoint\"&\n\010Endpoint\022\014\n\004host\030\001 \002(\t\022\014\n\004po" + - "rt\030\002 \002(\r\"?\n\020ActorRefProtocol\022\017\n\007address\030" + - "\001 \002(\t\022\014\n\004host\030\002 \002(\t\022\014\n\004port\030\003 \002(\r\";\n\017Mes" + - "sageProtocol\022\017\n\007message\030\001 \002(\014\022\027\n\017message" + - "Manifest\030\002 \001(\014\")\n\014UuidProtocol\022\014\n\004high\030\001" + - " \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEntryProtoc" + - "ol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"1\n\017Addres" + - "sProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(" + - "\r\"7\n\021ExceptionProtocol\022\021\n\tclassname\030\001 \002(", - "\t\022\017\n\007message\030\002 \002(\t\"\256\001\n!RemoteSystemDaemo" + - "nMessageProtocol\0223\n\013messageType\030\001 \002(\0162\036." + - "RemoteSystemDaemonMessageType\022\024\n\014actorAd" + - "dress\030\002 \001(\t\022\017\n\007payload\030\003 \001(\014\022-\n\026replicat" + - "eActorFromUuid\030\004 \001(\0132\r.UuidProtocol\"y\n\035D" + - "urableMailboxMessageProtocol\022$\n\trecipien" + - "t\030\001 \002(\0132\021.ActorRefProtocol\022!\n\006sender\030\002 \001" + - "(\0132\021.ActorRefProtocol\022\017\n\007message\030\003 \002(\014*(" + - "\n\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002" + - "*K\n\026ReplicationStorageType\022\r\n\tTRANSIENT\020", - "\001\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tDATA_GRID\020\003*>\n" + - "\027ReplicationStrategyType\022\021\n\rWRITE_THROUG" + - "H\020\001\022\020\n\014WRITE_BEHIND\020\002*\241\002\n\035RemoteSystemDa" + - "emonMessageType\022\010\n\004STOP\020\001\022\007\n\003USE\020\002\022\013\n\007RE" + - "LEASE\020\003\022\022\n\016MAKE_AVAILABLE\020\004\022\024\n\020MAKE_UNAV" + - "AILABLE\020\005\022\016\n\nDISCONNECT\020\006\022\r\n\tRECONNECT\020\007" + - "\022\n\n\006RESIGN\020\010\022\n\n\006GOSSIP\020\t\022\031\n\025FAIL_OVER_CO" + - "NNECTIONS\020\024\022\026\n\022FUNCTION_FUN0_UNIT\020\025\022\025\n\021F" + - "UNCTION_FUN0_ANY\020\026\022\032\n\026FUNCTION_FUN1_ARG_" + - "UNIT\020\027\022\031\n\025FUNCTION_FUN1_ARG_ANY\020\030B\017\n\013akk", - "a.remoteH\001" + "dType\022\016\n\006cookie\030\002 \001(\t\022 \n\006origin\030\003 \001(\0132\020." + + "AddressProtocol\"?\n\020ActorRefProtocol\022\017\n\007a" + + "ddress\030\001 \002(\t\022\014\n\004host\030\002 \002(\t\022\014\n\004port\030\003 \002(\r" + + "\";\n\017MessageProtocol\022\017\n\007message\030\001 \002(\014\022\027\n\017" + + "messageManifest\030\002 \001(\014\")\n\014UuidProtocol\022\014\n" + + "\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEntr" + + "yProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"1\n" + + "\017AddressProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004po" + + "rt\030\002 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tclassna" + + "me\030\001 \002(\t\022\017\n\007message\030\002 \002(\t\"\256\001\n!RemoteSyst", + "emDaemonMessageProtocol\0223\n\013messageType\030\001" + + " \002(\0162\036.RemoteSystemDaemonMessageType\022\024\n\014" + + "actorAddress\030\002 \001(\t\022\017\n\007payload\030\003 \001(\014\022-\n\026r" + + "eplicateActorFromUuid\030\004 \001(\0132\r.UuidProtoc" + + "ol\"y\n\035DurableMailboxMessageProtocol\022$\n\tr" + + "ecipient\030\001 \002(\0132\021.ActorRefProtocol\022!\n\006sen" + + "der\030\002 \001(\0132\021.ActorRefProtocol\022\017\n\007message\030" + + "\003 \002(\014*(\n\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHU" + + "TDOWN\020\002*K\n\026ReplicationStorageType\022\r\n\tTRA" + + "NSIENT\020\001\022\023\n\017TRANSACTION_LOG\020\002\022\r\n\tDATA_GR", + "ID\020\003*>\n\027ReplicationStrategyType\022\021\n\rWRITE" + + "_THROUGH\020\001\022\020\n\014WRITE_BEHIND\020\002*\241\002\n\035RemoteS" + + "ystemDaemonMessageType\022\010\n\004STOP\020\001\022\007\n\003USE\020" + + "\002\022\013\n\007RELEASE\020\003\022\022\n\016MAKE_AVAILABLE\020\004\022\024\n\020MA" + + "KE_UNAVAILABLE\020\005\022\016\n\nDISCONNECT\020\006\022\r\n\tRECO" + + "NNECT\020\007\022\n\n\006RESIGN\020\010\022\n\n\006GOSSIP\020\t\022\031\n\025FAIL_" + + "OVER_CONNECTIONS\020\024\022\026\n\022FUNCTION_FUN0_UNIT" + + "\020\025\022\025\n\021FUNCTION_FUN0_ANY\020\026\022\032\n\026FUNCTION_FU" + + "N1_ARG_UNIT\020\027\022\031\n\025FUNCTION_FUN1_ARG_ANY\020\030" + + "B\017\n\013akka.remoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -7377,16 +6923,8 @@ public final class RemoteProtocol { new java.lang.String[] { "CommandType", "Cookie", "Origin", }, akka.remote.RemoteProtocol.RemoteControlProtocol.class, akka.remote.RemoteProtocol.RemoteControlProtocol.Builder.class); - internal_static_Endpoint_descriptor = - getDescriptor().getMessageTypes().get(3); - internal_static_Endpoint_fieldAccessorTable = new - com.google.protobuf.GeneratedMessage.FieldAccessorTable( - internal_static_Endpoint_descriptor, - new java.lang.String[] { "Host", "Port", }, - akka.remote.RemoteProtocol.Endpoint.class, - akka.remote.RemoteProtocol.Endpoint.Builder.class); internal_static_ActorRefProtocol_descriptor = - getDescriptor().getMessageTypes().get(4); + getDescriptor().getMessageTypes().get(3); internal_static_ActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ActorRefProtocol_descriptor, @@ -7394,7 +6932,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.ActorRefProtocol.class, akka.remote.RemoteProtocol.ActorRefProtocol.Builder.class); internal_static_MessageProtocol_descriptor = - getDescriptor().getMessageTypes().get(5); + getDescriptor().getMessageTypes().get(4); internal_static_MessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MessageProtocol_descriptor, @@ -7402,7 +6940,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.MessageProtocol.class, akka.remote.RemoteProtocol.MessageProtocol.Builder.class); internal_static_UuidProtocol_descriptor = - getDescriptor().getMessageTypes().get(6); + getDescriptor().getMessageTypes().get(5); internal_static_UuidProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_UuidProtocol_descriptor, @@ -7410,7 +6948,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.UuidProtocol.class, akka.remote.RemoteProtocol.UuidProtocol.Builder.class); internal_static_MetadataEntryProtocol_descriptor = - getDescriptor().getMessageTypes().get(7); + getDescriptor().getMessageTypes().get(6); internal_static_MetadataEntryProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MetadataEntryProtocol_descriptor, @@ -7418,7 +6956,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.MetadataEntryProtocol.class, akka.remote.RemoteProtocol.MetadataEntryProtocol.Builder.class); internal_static_AddressProtocol_descriptor = - getDescriptor().getMessageTypes().get(8); + getDescriptor().getMessageTypes().get(7); internal_static_AddressProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AddressProtocol_descriptor, @@ -7426,7 +6964,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.AddressProtocol.class, akka.remote.RemoteProtocol.AddressProtocol.Builder.class); internal_static_ExceptionProtocol_descriptor = - getDescriptor().getMessageTypes().get(9); + getDescriptor().getMessageTypes().get(8); internal_static_ExceptionProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ExceptionProtocol_descriptor, @@ -7434,7 +6972,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.ExceptionProtocol.class, akka.remote.RemoteProtocol.ExceptionProtocol.Builder.class); internal_static_RemoteSystemDaemonMessageProtocol_descriptor = - getDescriptor().getMessageTypes().get(10); + getDescriptor().getMessageTypes().get(9); internal_static_RemoteSystemDaemonMessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteSystemDaemonMessageProtocol_descriptor, @@ -7442,7 +6980,7 @@ public final class RemoteProtocol { akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.class, akka.remote.RemoteProtocol.RemoteSystemDaemonMessageProtocol.Builder.class); internal_static_DurableMailboxMessageProtocol_descriptor = - getDescriptor().getMessageTypes().get(11); + getDescriptor().getMessageTypes().get(10); internal_static_DurableMailboxMessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_DurableMailboxMessageProtocol_descriptor, diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 9d6ad23bd7..d777009950 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -33,12 +33,7 @@ message RemoteMessageProtocol { message RemoteControlProtocol { required CommandType commandType = 1; optional string cookie = 2; - optional Endpoint origin = 3; -} - -message Endpoint { - required string host = 1; - required uint32 port = 2; + optional AddressProtocol origin = 3; } /** diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index f70e479a11..0a53cce810 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -210,7 +210,7 @@ class ActiveRemoteClient private[akka] ( def sendSecureCookie(connection: ChannelFuture) { val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) if (SECURE_COOKIE.nonEmpty) handshake.setCookie(SECURE_COOKIE.get) - handshake.setOrigin(RemoteProtocol.Endpoint.newBuilder().setHost(app.hostname).setPort(app.port).build) + handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder().setHostname(app.hostname).setPort(app.port).build) connection.getChannel.write(createControlEnvelope(handshake.build)) } @@ -646,14 +646,13 @@ class RemoteServerHandler( event.getMessage match { case null ⇒ throw new IllegalActorStateException("Message in remote MessageEvent is null [" + event + "]") case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ handleRemoteMessageProtocol(remote.getMessage, event.getChannel) - //case remote: AkkaRemoteProtocol if remote.hasInstruction => RemoteServer cannot receive control messages (yet) + case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒ //Doesn't handle instructions case _ ⇒ //ignore } } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { app.eventHandler.error(event.getCause, this, "Unexpected exception from remote downstream") - event.getChannel.close server.notifyListeners(RemoteServerError(event.getCause, server)) }