From 08670ca1552a36d9fda148847b67d8dacd906e62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martynas=20Mickevi=C4=8Dius?= Date: Tue, 20 Sep 2016 14:23:50 +0300 Subject: [PATCH] Add non-local recipient check when handling HandshaleReq (#21497) --- .../akka/remote/ArteryControlFormats.java | 790 +++++++++++++++++- .../main/protobuf/ArteryControlFormats.proto | 6 +- .../scala/akka/remote/artery/Codecs.scala | 9 +- .../scala/akka/remote/artery/Handshake.scala | 27 +- .../akka/remote/artery/InboundEnvelope.scala | 30 +- .../remote/artery/MessageDispatcher.scala | 9 +- .../remote/artery/SystemMessageDelivery.scala | 2 - .../ArteryMessageSerializer.scala | 15 +- .../remote/artery/HandshakeDenySpec.scala | 48 ++ .../remote/artery/HandshakeRetrySpec.scala | 3 +- .../artery/InboundControlJunctionSpec.scala | 7 +- .../remote/artery/InboundHandshakeSpec.scala | 11 +- .../remote/artery/OutboundHandshakeSpec.scala | 18 +- .../artery/SystemMessageAckerSpec.scala | 2 +- .../artery/SystemMessageDeliverySpec.scala | 6 +- .../akka/remote/artery/TestContext.scala | 4 +- .../ArteryMessageSerializerSpec.scala | 2 +- 17 files changed, 892 insertions(+), 97 deletions(-) create mode 100644 akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala diff --git a/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java b/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java index eb176e91ba..22deb5c3cd 100644 --- a/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java +++ b/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java @@ -764,7 +764,6 @@ public final class ArteryControlFormats { * address field. A message that needs to changed later can be cloned from this one and then adapted. * ActorSystemTerminating * ActorSystemTerminating.Ack - * OutboundHandshake.HandshakeReq * OutboundHandshake.HandshakeRsp * */ @@ -1016,7 +1015,6 @@ public final class ArteryControlFormats { * address field. A message that needs to changed later can be cloned from this one and then adapted. * ActorSystemTerminating * ActorSystemTerminating.Ack - * OutboundHandshake.HandshakeReq * OutboundHandshake.HandshakeRsp * */ @@ -1280,6 +1278,737 @@ public final class ArteryControlFormats { // @@protoc_insertion_point(class_scope:MessageWithAddress) } + public interface HandshakeReqOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required .UniqueAddress from = 1; + /** + * required .UniqueAddress from = 1; + */ + boolean hasFrom(); + /** + * required .UniqueAddress from = 1; + */ + akka.remote.ArteryControlFormats.UniqueAddress getFrom(); + /** + * required .UniqueAddress from = 1; + */ + akka.remote.ArteryControlFormats.UniqueAddressOrBuilder getFromOrBuilder(); + + // required .Address to = 2; + /** + * required .Address to = 2; + */ + boolean hasTo(); + /** + * required .Address to = 2; + */ + akka.remote.ArteryControlFormats.Address getTo(); + /** + * required .Address to = 2; + */ + akka.remote.ArteryControlFormats.AddressOrBuilder getToOrBuilder(); + } + /** + * Protobuf type {@code HandshakeReq} + */ + public static final class HandshakeReq extends + akka.protobuf.GeneratedMessage + implements HandshakeReqOrBuilder { + // Use HandshakeReq.newBuilder() to construct. + private HandshakeReq(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private HandshakeReq(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final HandshakeReq defaultInstance; + public static HandshakeReq getDefaultInstance() { + return defaultInstance; + } + + public HandshakeReq getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private HandshakeReq( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + akka.remote.ArteryControlFormats.UniqueAddress.Builder subBuilder = null; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + subBuilder = from_.toBuilder(); + } + from_ = input.readMessage(akka.remote.ArteryControlFormats.UniqueAddress.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(from_); + from_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000001; + break; + } + case 18: { + akka.remote.ArteryControlFormats.Address.Builder subBuilder = null; + if (((bitField0_ & 0x00000002) == 0x00000002)) { + subBuilder = to_.toBuilder(); + } + to_ = input.readMessage(akka.remote.ArteryControlFormats.Address.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(to_); + to_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000002; + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.remote.ArteryControlFormats.internal_static_HandshakeReq_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.remote.ArteryControlFormats.internal_static_HandshakeReq_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.remote.ArteryControlFormats.HandshakeReq.class, akka.remote.ArteryControlFormats.HandshakeReq.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public HandshakeReq parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new HandshakeReq(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required .UniqueAddress from = 1; + public static final int FROM_FIELD_NUMBER = 1; + private akka.remote.ArteryControlFormats.UniqueAddress from_; + /** + * required .UniqueAddress from = 1; + */ + public boolean hasFrom() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required .UniqueAddress from = 1; + */ + public akka.remote.ArteryControlFormats.UniqueAddress getFrom() { + return from_; + } + /** + * required .UniqueAddress from = 1; + */ + public akka.remote.ArteryControlFormats.UniqueAddressOrBuilder getFromOrBuilder() { + return from_; + } + + // required .Address to = 2; + public static final int TO_FIELD_NUMBER = 2; + private akka.remote.ArteryControlFormats.Address to_; + /** + * required .Address to = 2; + */ + public boolean hasTo() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required .Address to = 2; + */ + public akka.remote.ArteryControlFormats.Address getTo() { + return to_; + } + /** + * required .Address to = 2; + */ + public akka.remote.ArteryControlFormats.AddressOrBuilder getToOrBuilder() { + return to_; + } + + private void initFields() { + from_ = akka.remote.ArteryControlFormats.UniqueAddress.getDefaultInstance(); + to_ = akka.remote.ArteryControlFormats.Address.getDefaultInstance(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasFrom()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasTo()) { + memoizedIsInitialized = 0; + return false; + } + if (!getFrom().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + if (!getTo().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeMessage(1, from_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeMessage(2, to_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeMessageSize(1, from_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeMessageSize(2, to_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.remote.ArteryControlFormats.HandshakeReq parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.remote.ArteryControlFormats.HandshakeReq parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.remote.ArteryControlFormats.HandshakeReq prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code HandshakeReq} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements akka.remote.ArteryControlFormats.HandshakeReqOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.remote.ArteryControlFormats.internal_static_HandshakeReq_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.remote.ArteryControlFormats.internal_static_HandshakeReq_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.remote.ArteryControlFormats.HandshakeReq.class, akka.remote.ArteryControlFormats.HandshakeReq.Builder.class); + } + + // Construct using akka.remote.ArteryControlFormats.HandshakeReq.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getFromFieldBuilder(); + getToFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + if (fromBuilder_ == null) { + from_ = akka.remote.ArteryControlFormats.UniqueAddress.getDefaultInstance(); + } else { + fromBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + if (toBuilder_ == null) { + to_ = akka.remote.ArteryControlFormats.Address.getDefaultInstance(); + } else { + toBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.remote.ArteryControlFormats.internal_static_HandshakeReq_descriptor; + } + + public akka.remote.ArteryControlFormats.HandshakeReq getDefaultInstanceForType() { + return akka.remote.ArteryControlFormats.HandshakeReq.getDefaultInstance(); + } + + public akka.remote.ArteryControlFormats.HandshakeReq build() { + akka.remote.ArteryControlFormats.HandshakeReq result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.remote.ArteryControlFormats.HandshakeReq buildPartial() { + akka.remote.ArteryControlFormats.HandshakeReq result = new akka.remote.ArteryControlFormats.HandshakeReq(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + if (fromBuilder_ == null) { + result.from_ = from_; + } else { + result.from_ = fromBuilder_.build(); + } + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + if (toBuilder_ == null) { + result.to_ = to_; + } else { + result.to_ = toBuilder_.build(); + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof akka.remote.ArteryControlFormats.HandshakeReq) { + return mergeFrom((akka.remote.ArteryControlFormats.HandshakeReq)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.remote.ArteryControlFormats.HandshakeReq other) { + if (other == akka.remote.ArteryControlFormats.HandshakeReq.getDefaultInstance()) return this; + if (other.hasFrom()) { + mergeFrom(other.getFrom()); + } + if (other.hasTo()) { + mergeTo(other.getTo()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasFrom()) { + + return false; + } + if (!hasTo()) { + + return false; + } + if (!getFrom().isInitialized()) { + + return false; + } + if (!getTo().isInitialized()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.remote.ArteryControlFormats.HandshakeReq parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.remote.ArteryControlFormats.HandshakeReq) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required .UniqueAddress from = 1; + private akka.remote.ArteryControlFormats.UniqueAddress from_ = akka.remote.ArteryControlFormats.UniqueAddress.getDefaultInstance(); + private akka.protobuf.SingleFieldBuilder< + akka.remote.ArteryControlFormats.UniqueAddress, akka.remote.ArteryControlFormats.UniqueAddress.Builder, akka.remote.ArteryControlFormats.UniqueAddressOrBuilder> fromBuilder_; + /** + * required .UniqueAddress from = 1; + */ + public boolean hasFrom() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required .UniqueAddress from = 1; + */ + public akka.remote.ArteryControlFormats.UniqueAddress getFrom() { + if (fromBuilder_ == null) { + return from_; + } else { + return fromBuilder_.getMessage(); + } + } + /** + * required .UniqueAddress from = 1; + */ + public Builder setFrom(akka.remote.ArteryControlFormats.UniqueAddress value) { + if (fromBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + from_ = value; + onChanged(); + } else { + fromBuilder_.setMessage(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .UniqueAddress from = 1; + */ + public Builder setFrom( + akka.remote.ArteryControlFormats.UniqueAddress.Builder builderForValue) { + if (fromBuilder_ == null) { + from_ = builderForValue.build(); + onChanged(); + } else { + fromBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .UniqueAddress from = 1; + */ + public Builder mergeFrom(akka.remote.ArteryControlFormats.UniqueAddress value) { + if (fromBuilder_ == null) { + if (((bitField0_ & 0x00000001) == 0x00000001) && + from_ != akka.remote.ArteryControlFormats.UniqueAddress.getDefaultInstance()) { + from_ = + akka.remote.ArteryControlFormats.UniqueAddress.newBuilder(from_).mergeFrom(value).buildPartial(); + } else { + from_ = value; + } + onChanged(); + } else { + fromBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .UniqueAddress from = 1; + */ + public Builder clearFrom() { + if (fromBuilder_ == null) { + from_ = akka.remote.ArteryControlFormats.UniqueAddress.getDefaultInstance(); + onChanged(); + } else { + fromBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + /** + * required .UniqueAddress from = 1; + */ + public akka.remote.ArteryControlFormats.UniqueAddress.Builder getFromBuilder() { + bitField0_ |= 0x00000001; + onChanged(); + return getFromFieldBuilder().getBuilder(); + } + /** + * required .UniqueAddress from = 1; + */ + public akka.remote.ArteryControlFormats.UniqueAddressOrBuilder getFromOrBuilder() { + if (fromBuilder_ != null) { + return fromBuilder_.getMessageOrBuilder(); + } else { + return from_; + } + } + /** + * required .UniqueAddress from = 1; + */ + private akka.protobuf.SingleFieldBuilder< + akka.remote.ArteryControlFormats.UniqueAddress, akka.remote.ArteryControlFormats.UniqueAddress.Builder, akka.remote.ArteryControlFormats.UniqueAddressOrBuilder> + getFromFieldBuilder() { + if (fromBuilder_ == null) { + fromBuilder_ = new akka.protobuf.SingleFieldBuilder< + akka.remote.ArteryControlFormats.UniqueAddress, akka.remote.ArteryControlFormats.UniqueAddress.Builder, akka.remote.ArteryControlFormats.UniqueAddressOrBuilder>( + from_, + getParentForChildren(), + isClean()); + from_ = null; + } + return fromBuilder_; + } + + // required .Address to = 2; + private akka.remote.ArteryControlFormats.Address to_ = akka.remote.ArteryControlFormats.Address.getDefaultInstance(); + private akka.protobuf.SingleFieldBuilder< + akka.remote.ArteryControlFormats.Address, akka.remote.ArteryControlFormats.Address.Builder, akka.remote.ArteryControlFormats.AddressOrBuilder> toBuilder_; + /** + * required .Address to = 2; + */ + public boolean hasTo() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required .Address to = 2; + */ + public akka.remote.ArteryControlFormats.Address getTo() { + if (toBuilder_ == null) { + return to_; + } else { + return toBuilder_.getMessage(); + } + } + /** + * required .Address to = 2; + */ + public Builder setTo(akka.remote.ArteryControlFormats.Address value) { + if (toBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + to_ = value; + onChanged(); + } else { + toBuilder_.setMessage(value); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * required .Address to = 2; + */ + public Builder setTo( + akka.remote.ArteryControlFormats.Address.Builder builderForValue) { + if (toBuilder_ == null) { + to_ = builderForValue.build(); + onChanged(); + } else { + toBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * required .Address to = 2; + */ + public Builder mergeTo(akka.remote.ArteryControlFormats.Address value) { + if (toBuilder_ == null) { + if (((bitField0_ & 0x00000002) == 0x00000002) && + to_ != akka.remote.ArteryControlFormats.Address.getDefaultInstance()) { + to_ = + akka.remote.ArteryControlFormats.Address.newBuilder(to_).mergeFrom(value).buildPartial(); + } else { + to_ = value; + } + onChanged(); + } else { + toBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000002; + return this; + } + /** + * required .Address to = 2; + */ + public Builder clearTo() { + if (toBuilder_ == null) { + to_ = akka.remote.ArteryControlFormats.Address.getDefaultInstance(); + onChanged(); + } else { + toBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + /** + * required .Address to = 2; + */ + public akka.remote.ArteryControlFormats.Address.Builder getToBuilder() { + bitField0_ |= 0x00000002; + onChanged(); + return getToFieldBuilder().getBuilder(); + } + /** + * required .Address to = 2; + */ + public akka.remote.ArteryControlFormats.AddressOrBuilder getToOrBuilder() { + if (toBuilder_ != null) { + return toBuilder_.getMessageOrBuilder(); + } else { + return to_; + } + } + /** + * required .Address to = 2; + */ + private akka.protobuf.SingleFieldBuilder< + akka.remote.ArteryControlFormats.Address, akka.remote.ArteryControlFormats.Address.Builder, akka.remote.ArteryControlFormats.AddressOrBuilder> + getToFieldBuilder() { + if (toBuilder_ == null) { + toBuilder_ = new akka.protobuf.SingleFieldBuilder< + akka.remote.ArteryControlFormats.Address, akka.remote.ArteryControlFormats.Address.Builder, akka.remote.ArteryControlFormats.AddressOrBuilder>( + to_, + getParentForChildren(), + isClean()); + to_ = null; + } + return toBuilder_; + } + + // @@protoc_insertion_point(builder_scope:HandshakeReq) + } + + static { + defaultInstance = new HandshakeReq(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:HandshakeReq) + } + public interface CompressionTableAdvertisementOrBuilder extends akka.protobuf.MessageOrBuilder { @@ -6102,6 +6831,11 @@ public final class ArteryControlFormats { private static akka.protobuf.GeneratedMessage.FieldAccessorTable internal_static_MessageWithAddress_fieldAccessorTable; + private static akka.protobuf.Descriptors.Descriptor + internal_static_HandshakeReq_descriptor; + private static + akka.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_HandshakeReq_fieldAccessorTable; private static akka.protobuf.Descriptors.Descriptor internal_static_CompressionTableAdvertisement_descriptor; private static @@ -6144,22 +6878,24 @@ public final class ArteryControlFormats { "\n\032ArteryControlFormats.proto\"G\n\013Quaranti" + "ned\022\034\n\004from\030\001 \002(\0132\016.UniqueAddress\022\032\n\002to\030" + "\002 \002(\0132\016.UniqueAddress\"5\n\022MessageWithAddr" + - "ess\022\037\n\007address\030\001 \002(\0132\016.UniqueAddress\"\204\001\n" + - "\035CompressionTableAdvertisement\022\034\n\004from\030\001" + - " \002(\0132\016.UniqueAddress\022\021\n\toriginUid\030\002 \002(\004\022" + - "\024\n\014tableVersion\030\003 \002(\r\022\014\n\004keys\030\004 \003(\t\022\016\n\006v" + - "alues\030\005 \003(\r\"Q\n CompressionTableAdvertise" + - "mentAck\022\034\n\004from\030\001 \002(\0132\016.UniqueAddress\022\017\n" + - "\007version\030\002 \002(\r\"\212\001\n\025SystemMessageEnvelope", - "\022\017\n\007message\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022" + - "\027\n\017messageManifest\030\003 \001(\014\022\r\n\005seqNo\030\004 \002(\004\022" + - "\"\n\nackReplyTo\030\005 \002(\0132\016.UniqueAddress\"G\n\030S" + - "ystemMessageDeliveryAck\022\r\n\005seqNo\030\001 \002(\004\022\034" + - "\n\004from\030\002 \002(\0132\016.UniqueAddress\"K\n\007Address\022" + - "\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\020\n\010hos" + - "tname\030\003 \002(\t\022\014\n\004port\030\004 \002(\r\"7\n\rUniqueAddre" + - "ss\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002 \002" + - "(\004B\017\n\013akka.remoteH\001" + "ess\022\037\n\007address\030\001 \002(\0132\016.UniqueAddress\"B\n\014" + + "HandshakeReq\022\034\n\004from\030\001 \002(\0132\016.UniqueAddre" + + "ss\022\024\n\002to\030\002 \002(\0132\010.Address\"\204\001\n\035Compression" + + "TableAdvertisement\022\034\n\004from\030\001 \002(\0132\016.Uniqu" + + "eAddress\022\021\n\toriginUid\030\002 \002(\004\022\024\n\014tableVers" + + "ion\030\003 \002(\r\022\014\n\004keys\030\004 \003(\t\022\016\n\006values\030\005 \003(\r\"" + + "Q\n CompressionTableAdvertisementAck\022\034\n\004f", + "rom\030\001 \002(\0132\016.UniqueAddress\022\017\n\007version\030\002 \002" + + "(\r\"\212\001\n\025SystemMessageEnvelope\022\017\n\007message\030" + + "\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messageMa" + + "nifest\030\003 \001(\014\022\r\n\005seqNo\030\004 \002(\004\022\"\n\nackReplyT" + + "o\030\005 \002(\0132\016.UniqueAddress\"G\n\030SystemMessage" + + "DeliveryAck\022\r\n\005seqNo\030\001 \002(\004\022\034\n\004from\030\002 \002(\013" + + "2\016.UniqueAddress\"K\n\007Address\022\020\n\010protocol\030" + + "\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022" + + "\014\n\004port\030\004 \002(\r\"7\n\rUniqueAddress\022\031\n\007addres" + + "s\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002 \002(\004B\017\n\013akka.r", + "emoteH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6178,38 +6914,44 @@ public final class ArteryControlFormats { akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MessageWithAddress_descriptor, new java.lang.String[] { "Address", }); - internal_static_CompressionTableAdvertisement_descriptor = + internal_static_HandshakeReq_descriptor = getDescriptor().getMessageTypes().get(2); + internal_static_HandshakeReq_fieldAccessorTable = new + akka.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_HandshakeReq_descriptor, + new java.lang.String[] { "From", "To", }); + internal_static_CompressionTableAdvertisement_descriptor = + getDescriptor().getMessageTypes().get(3); internal_static_CompressionTableAdvertisement_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CompressionTableAdvertisement_descriptor, new java.lang.String[] { "From", "OriginUid", "TableVersion", "Keys", "Values", }); internal_static_CompressionTableAdvertisementAck_descriptor = - getDescriptor().getMessageTypes().get(3); + getDescriptor().getMessageTypes().get(4); internal_static_CompressionTableAdvertisementAck_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_CompressionTableAdvertisementAck_descriptor, new java.lang.String[] { "From", "Version", }); internal_static_SystemMessageEnvelope_descriptor = - getDescriptor().getMessageTypes().get(4); + getDescriptor().getMessageTypes().get(5); internal_static_SystemMessageEnvelope_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SystemMessageEnvelope_descriptor, new java.lang.String[] { "Message", "SerializerId", "MessageManifest", "SeqNo", "AckReplyTo", }); internal_static_SystemMessageDeliveryAck_descriptor = - getDescriptor().getMessageTypes().get(5); + getDescriptor().getMessageTypes().get(6); internal_static_SystemMessageDeliveryAck_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SystemMessageDeliveryAck_descriptor, new java.lang.String[] { "SeqNo", "From", }); internal_static_Address_descriptor = - getDescriptor().getMessageTypes().get(6); + getDescriptor().getMessageTypes().get(7); internal_static_Address_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_Address_descriptor, new java.lang.String[] { "Protocol", "System", "Hostname", "Port", }); internal_static_UniqueAddress_descriptor = - getDescriptor().getMessageTypes().get(7); + getDescriptor().getMessageTypes().get(8); internal_static_UniqueAddress_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_UniqueAddress_descriptor, diff --git a/akka-remote/src/main/protobuf/ArteryControlFormats.proto b/akka-remote/src/main/protobuf/ArteryControlFormats.proto index aafff98016..d71d66fe63 100644 --- a/akka-remote/src/main/protobuf/ArteryControlFormats.proto +++ b/akka-remote/src/main/protobuf/ArteryControlFormats.proto @@ -14,12 +14,16 @@ message Quarantined { // address field. A message that needs to changed later can be cloned from this one and then adapted. // ActorSystemTerminating // ActorSystemTerminating.Ack -// OutboundHandshake.HandshakeReq // OutboundHandshake.HandshakeRsp message MessageWithAddress { required UniqueAddress address = 1; } +message HandshakeReq { + required UniqueAddress from = 1; + required Address to = 2; +} + // CompressionProtocol.ActorRefCompressionAdvertisement // CompressionProtocol.ClassManifestCompressionAdvertisement message CompressionTableAdvertisement { diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index 58474c3be3..a470e5ae89 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -11,7 +11,7 @@ import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope import akka.serialization.{ Serialization, SerializationExtension } import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -import akka.util.{ ByteString, OptionVal, PrettyByteString } +import akka.util.{ ByteString, OptionVal } import akka.actor.EmptyLocalActorRef import akka.remote.artery.compress.InboundCompressions import akka.stream.stage.TimerGraphStageLogic @@ -24,8 +24,6 @@ import akka.stream.stage.GraphStageWithMaterializedValue import scala.concurrent.Promise -import scala.annotation.switch - /** * INTERNAL API */ @@ -378,7 +376,6 @@ private[remote] class Decoder( val decoded = inEnvelopePool.acquire().init( recipient, - localAddress, // FIXME: this is used for the "non-local recipient" check in MessageDispatcher. Is this needed anymore? sender, originUid, headerBuilder.serializer, @@ -405,8 +402,9 @@ private[remote] class Decoder( scheduleOnce(RetryResolveRemoteDeployedRecipient( retryResolveRemoteDeployedRecipientAttempts, recipientActorRefPath, decoded), retryResolveRemoteDeployedRecipientInterval) - } else + } else { push(out, decoded) + } } } @@ -537,4 +535,3 @@ private[remote] class Deserializer( setHandlers(in, out, this) } } - diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 801547d87f..3448c26e78 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -20,6 +20,7 @@ import akka.stream.stage.TimerGraphStageLogic import akka.util.OptionVal import akka.Done import scala.concurrent.Future +import akka.actor.Address /** * INTERNAL API @@ -32,7 +33,7 @@ private[akka] object OutboundHandshake { */ class HandshakeTimeoutException(msg: String) extends RuntimeException(msg) with NoStackTrace - final case class HandshakeReq(from: UniqueAddress) extends ControlMessage + final case class HandshakeReq(from: UniqueAddress, to: Address) extends ControlMessage final case class HandshakeRsp(from: UniqueAddress) extends Reply private sealed trait HandshakeState @@ -130,7 +131,7 @@ private[akka] class OutboundHandshake( injectHandshakeTickScheduled = true scheduleOnce(InjectHandshakeTick, injectHandshakeInterval) val env: OutboundEnvelope = outboundEnvelopePool.acquire().init( - recipient = OptionVal.None, message = HandshakeReq(outboundContext.localAddress), sender = OptionVal.None) + recipient = OptionVal.None, message = HandshakeReq(outboundContext.localAddress, outboundContext.remoteAddress), sender = OptionVal.None) push(out, env) } @@ -176,7 +177,7 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt override def onPush(): Unit = { val env = grab(in) env.message match { - case HandshakeReq(from) ⇒ onHandshakeReq(from) + case HandshakeReq(from, to) ⇒ onHandshakeReq(from, to) case HandshakeRsp(from) ⇒ after(inboundContext.completeHandshake(from)) { pull(in) @@ -191,16 +192,28 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt override def onPush(): Unit = { val env = grab(in) env.message match { - case HandshakeReq(from) ⇒ onHandshakeReq(from) + case HandshakeReq(from, to) ⇒ onHandshakeReq(from, to) case _ ⇒ onMessage(env) } } }) - private def onHandshakeReq(from: UniqueAddress): Unit = { - after(inboundContext.completeHandshake(from)) { - inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress)) + private def onHandshakeReq(from: UniqueAddress, to: Address): Unit = { + if (to == inboundContext.localAddress.address) { + after(inboundContext.completeHandshake(from)) { + inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress)) + pull(in) + } + } else { + log.warning( + "Dropping Handshake Request from [{}] addressed to unknown local address [{}]. " + + "Local address is [{}]. Check that the sending system uses the same " + + "address to contact recipient system as defined in the " + + "'akka.remote.artery.canonical.hostname' of the recipient system. " + + "The name of the ActorSystem must also match.", + from, to, inboundContext.localAddress.address) + pull(in) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala index c017d73107..b9d17362ae 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala @@ -16,14 +16,13 @@ private[remote] object InboundEnvelope { * Only used in tests */ def apply( - recipient: OptionVal[InternalActorRef], - recipientAddress: Address, - message: AnyRef, - sender: OptionVal[ActorRef], - originUid: Long, - association: OptionVal[OutboundContext]): InboundEnvelope = { + recipient: OptionVal[InternalActorRef], + message: AnyRef, + sender: OptionVal[ActorRef], + originUid: Long, + association: OptionVal[OutboundContext]): InboundEnvelope = { val env = new ReusableInboundEnvelope - env.init(recipient, recipientAddress, sender, originUid, -1, "", 0, null, association) + env.init(recipient, sender, originUid, -1, "", 0, null, association) .withMessage(message) } @@ -116,15 +115,14 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope { } def init( - recipient: OptionVal[InternalActorRef], - recipientAddress: Address, - sender: OptionVal[ActorRef], - originUid: Long, - serializer: Int, - classManifest: String, - flags: Byte, - envelopeBuffer: EnvelopeBuffer, - association: OptionVal[OutboundContext]): InboundEnvelope = { + recipient: OptionVal[InternalActorRef], + sender: OptionVal[ActorRef], + originUid: Long, + serializer: Int, + classManifest: String, + flags: Byte, + envelopeBuffer: EnvelopeBuffer, + association: OptionVal[OutboundContext]): InboundEnvelope = { _recipient = recipient _recipientAddress = recipientAddress _sender = sender diff --git a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala index 7525473479..b5dc011a6e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala @@ -71,13 +71,8 @@ private[akka] class MessageDispatcher( case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒ if (LogReceive) log.debug("received remote-destined message {}", msgLog) - if (provider.transport.addresses(recipientAddress)) - // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) - r.!(message)(sender) - else - log.error( - "dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]", - message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) + // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) + r.!(message)(sender) case r ⇒ log.error( "dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]", diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 3e093fa5f8..1a6eed90d7 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -24,8 +24,6 @@ import akka.stream.stage.OutHandler import akka.stream.stage.TimerGraphStageLogic import akka.remote.artery.OutboundHandshake.HandshakeReq import akka.actor.ActorRef -import akka.remote.PriorityMessage -import akka.actor.ActorSelectionMessage import akka.dispatch.sysmsg.SystemMessage import scala.util.control.NoStackTrace diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala index 30499f86a4..ee2694a3ac 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala @@ -56,7 +56,7 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste override def toBinary(o: AnyRef): Array[Byte] = (o match { // most frequent ones first case env: SystemMessageDelivery.SystemMessageEnvelope ⇒ serializeSystemMessageEnvelope(env) case SystemMessageDelivery.Ack(seqNo, from) ⇒ serializeSystemMessageDeliveryAck(seqNo, from) - case HandshakeReq(from) ⇒ serializeWithAddress(from) + case HandshakeReq(from, to) ⇒ serializeHandshakeReq(from, to) case HandshakeRsp(from) ⇒ serializeWithAddress(from) case SystemMessageDelivery.Nack(seqNo, from) ⇒ serializeSystemMessageDeliveryAck(seqNo, from) case q: Quarantined ⇒ serializeQuarantined(q) @@ -71,7 +71,7 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { // most frequent ones first (could be made a HashMap in the future) case SystemMessageEnvelopeManifest ⇒ deserializeSystemMessageEnvelope(bytes) case SystemMessageDeliveryAckManifest ⇒ deserializeSystemMessageDeliveryAck(bytes, SystemMessageDelivery.Ack) - case HandshakeReqManifest ⇒ deserializeWithFromAddress(bytes, HandshakeReq) + case HandshakeReqManifest ⇒ deserializeHandshakeReq(bytes, HandshakeReq) case HandshakeRspManifest ⇒ deserializeWithFromAddress(bytes, HandshakeRsp) case SystemMessageDeliveryNackManifest ⇒ deserializeSystemMessageDeliveryAck(bytes, SystemMessageDelivery.Nack) case QuarantinedManifest ⇒ deserializeQuarantined(ArteryControlFormats.Quarantined.parseFrom(bytes)) @@ -192,6 +192,17 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste def deserializeWithFromAddress(bytes: Array[Byte], create: UniqueAddress ⇒ AnyRef): AnyRef = create(deserializeUniqueAddress(ArteryControlFormats.MessageWithAddress.parseFrom(bytes).getAddress)) + def serializeHandshakeReq(from: UniqueAddress, to: Address): MessageLite = + ArteryControlFormats.HandshakeReq.newBuilder + .setFrom(serializeUniqueAddress(from)) + .setTo(serializeAddress(to)) + .build() + + def deserializeHandshakeReq(bytes: Array[Byte], create: (UniqueAddress, Address) ⇒ HandshakeReq): HandshakeReq = { + val protoEnv = ArteryControlFormats.HandshakeReq.parseFrom(bytes) + create(deserializeUniqueAddress(protoEnv.getFrom), deserializeAddress(protoEnv.getTo)) + } + def serializeUniqueAddress(address: UniqueAddress): ArteryControlFormats.UniqueAddress = ArteryControlFormats.UniqueAddress.newBuilder() .setAddress(serializeAddress(address.address)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala new file mode 100644 index 0000000000..864c0bbe49 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ + +import akka.actor.{ ActorIdentity, ActorSystem, Identify } +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import akka.actor.RootActorPath + +object HandshakeDenySpec { + + val commonConfig = ConfigFactory.parseString(s""" + akka.loglevel = WARNING + akka { + actor.provider = remote + remote.artery.enabled = on + remote.artery.canonical.hostname = localhost + remote.artery.canonical.port = 0 + remote.artery.advanced.handshake-timeout = 2s + } + """) + +} + +class HandshakeDenySpec extends ArteryMultiNodeSpec(HandshakeDenySpec.commonConfig) with ImplicitSender { + import HandshakeDenySpec._ + + var systemB = newRemoteSystem(name = Some("systemB")) + + "Artery handshake" must { + + "be denied when originating address is unknown" in { + val sel = system.actorSelection(RootActorPath(address(systemB).copy(host = Some("127.0.0.1"))) / "user" / "echo") + + systemB.actorOf(TestActors.echoActorProps, "echo") + + EventFilter.warning(start = "Dropping Handshake Request from").intercept { + sel ! Identify(None) + expectNoMsg(3.seconds) + }(systemB) + } + + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala index 3e8feadf59..2ce40262e0 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala @@ -22,8 +22,7 @@ object HandshakeRetrySpec { remote.artery.enabled = on remote.artery.canonical.hostname = localhost remote.artery.canonical.port = 0 - remote.handshake-timeout = 10s - + remote.artery.advanced.handshake-timeout = 10s } """) diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala index 44e3a26692..a6030bdffc 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala @@ -3,12 +3,9 @@ */ package akka.remote.artery -import scala.concurrent.duration._ import akka.actor.Address -import akka.actor.InternalActorRef import akka.remote.UniqueAddress import akka.remote.artery.InboundControlJunction.ControlMessageObserver -import akka.remote.artery.SystemMessageDelivery._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.scaladsl.Keep @@ -49,14 +46,14 @@ class InboundControlJunctionSpec val recipient = OptionVal.None // not used val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef] - .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid, OptionVal.None)) + .map(msg ⇒ InboundEnvelope(recipient, msg, OptionVal.None, addressA.uid, OptionVal.None)) .viaMat(new InboundControlJunction)(Keep.both) .map { case env: InboundEnvelope ⇒ env.message } .toMat(TestSink.probe[Any])(Keep.both) .run() controlSubject.attach(new ControlMessageObserver { - override def notify(env: InboundEnvelope) { + override def notify(env: InboundEnvelope) = { observerProbe.ref ! env.message } }) diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala index befdc927e0..96f1b7d648 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala @@ -6,11 +6,9 @@ package akka.remote.artery import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.Address -import akka.actor.InternalActorRef import akka.remote.UniqueAddress import akka.remote.artery.OutboundHandshake.HandshakeReq import akka.remote.artery.OutboundHandshake.HandshakeRsp -import akka.remote.artery.SystemMessageDelivery._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.scaladsl.Keep @@ -30,7 +28,6 @@ object InboundHandshakeSpec { } class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { - import InboundHandshakeSpec._ val matSettings = ActorMaterializerSettings(system).withFuzzing(true) implicit val mat = ActorMaterializer(matSettings)(system) @@ -41,7 +38,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { val recipient = OptionVal.None // not used TestSource.probe[AnyRef] - .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid, + .map(msg ⇒ InboundEnvelope(recipient, msg, OptionVal.None, addressA.uid, inboundContext.association(addressA.uid))) .via(new InboundHandshake(inboundContext, inControlStream = true)) .map { case env: InboundEnvelope ⇒ env.message } @@ -57,7 +54,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { val (upstream, downstream) = setupStream(inboundContext) downstream.request(10) - upstream.sendNext(HandshakeReq(addressA)) + upstream.sendNext(HandshakeReq(addressA, addressB.address)) upstream.sendNext("msg1") replyProbe.expectMsg(HandshakeRsp(addressB)) downstream.expectNext("msg1") @@ -69,7 +66,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { val (upstream, downstream) = setupStream(inboundContext) downstream.request(10) - upstream.sendNext(HandshakeReq(addressA)) + upstream.sendNext(HandshakeReq(addressA, addressB.address)) upstream.sendNext("msg1") downstream.expectNext("msg1") val uniqueRemoteAddress = Await.result( @@ -89,7 +86,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { downstream.expectNoMsg(200.millis) // messages from unknown are dropped // and accept messages after handshake - upstream.sendNext(HandshakeReq(addressA)) + upstream.sendNext(HandshakeReq(addressA, addressB.address)) upstream.sendNext("msg18") replyProbe.expectMsg(HandshakeRsp(addressB)) downstream.expectNext("msg18") diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala index 3b044e654c..b5a87f0940 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala @@ -53,7 +53,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { val (upstream, downstream) = setupStream(outboundContext) downstream.request(10) - downstream.expectNext(HandshakeReq(addressA)) + downstream.expectNext(HandshakeReq(addressA, addressB.address)) downstream.cancel() } @@ -65,7 +65,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { upstream.sendNext("msg1") downstream.request(10) - downstream.expectNext(HandshakeReq(addressA)) + downstream.expectNext(HandshakeReq(addressA, addressB.address)) downstream.expectNext("msg1") downstream.cancel() } @@ -76,7 +76,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { val (upstream, downstream) = setupStream(outboundContext, timeout = 200.millis) downstream.request(1) - downstream.expectNext(HandshakeReq(addressA)) + downstream.expectNext(HandshakeReq(addressA, addressB.address)) downstream.expectError().getClass should be(classOf[HandshakeTimeoutException]) } @@ -86,9 +86,9 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { val (upstream, downstream) = setupStream(outboundContext, retryInterval = 100.millis) downstream.request(10) - downstream.expectNext(HandshakeReq(addressA)) - downstream.expectNext(HandshakeReq(addressA)) - downstream.expectNext(HandshakeReq(addressA)) + downstream.expectNext(HandshakeReq(addressA, addressB.address)) + downstream.expectNext(HandshakeReq(addressA, addressB.address)) + downstream.expectNext(HandshakeReq(addressA, addressB.address)) downstream.cancel() } @@ -98,7 +98,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { val (upstream, downstream) = setupStream(outboundContext) downstream.request(10) - downstream.expectNext(HandshakeReq(addressA)) + downstream.expectNext(HandshakeReq(addressA, addressB.address)) upstream.sendNext("msg1") downstream.expectNoMsg(200.millis) // InboundHandshake stage will complete the handshake when receiving HandshakeRsp @@ -116,7 +116,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { downstream.request(10) upstream.sendNext("msg1") - downstream.expectNext(HandshakeReq(addressA)) + downstream.expectNext(HandshakeReq(addressA, addressB.address)) inboundContext.completeHandshake(addressB) downstream.expectNext("msg1") @@ -124,7 +124,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { upstream.sendNext("msg2") upstream.sendNext("msg3") upstream.sendNext("msg4") - downstream.expectNext(HandshakeReq(addressA)) + downstream.expectNext(HandshakeReq(addressA, addressB.address)) downstream.expectNext("msg2") downstream.expectNext("msg3") downstream.expectNext("msg4") diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala index 5ba36ba0ef..fd0d44fdf1 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala @@ -34,7 +34,7 @@ class SystemMessageAckerSpec extends AkkaSpec with ImplicitSender { TestSource.probe[AnyRef] .map { case sysMsg @ SystemMessageEnvelope(_, _, ackReplyTo) ⇒ - InboundEnvelope(recipient, addressA.address, sysMsg, OptionVal.None, ackReplyTo.uid, + InboundEnvelope(recipient, sysMsg, OptionVal.None, ackReplyTo.uid, inboundContext.association(ackReplyTo.uid)) } .via(new SystemMessageAcker(inboundContext)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 9264c7d980..cc7ed61608 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -10,12 +10,10 @@ import scala.concurrent.duration._ import akka.NotUsed import akka.actor.ActorIdentity import akka.actor.ActorSystem -import akka.actor.ExtendedActorSystem import akka.actor.Identify -import akka.actor.InternalActorRef import akka.actor.PoisonPill import akka.actor.RootActorPath -import akka.remote.{ AddressUidExtension, RARP, RemoteActorRef, UniqueAddress } +import akka.remote.{ AddressUidExtension, RARP, UniqueAddress } import akka.remote.artery.SystemMessageDelivery._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings @@ -79,7 +77,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi Flow[OutboundEnvelope] .map(outboundEnvelope ⇒ outboundEnvelope.message match { case sysEnv: SystemMessageEnvelope ⇒ - InboundEnvelope(recipient, addressB.address, sysEnv, OptionVal.None, addressA.uid, + InboundEnvelope(recipient, sysEnv, OptionVal.None, addressA.uid, inboundContext.association(addressA.uid)) }) .async diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index db5965589d..422211634b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -12,12 +12,10 @@ import scala.util.Success import akka.Done import akka.actor.ActorRef import akka.actor.Address -import akka.remote.RemoteActorRef import akka.remote.UniqueAddress import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.util.OptionVal -import akka.actor.InternalActorRef import akka.dispatch.ExecutionContexts import com.typesafe.config.ConfigFactory @@ -94,7 +92,7 @@ private[remote] class TestOutboundContext( override def sendControl(message: ControlMessage) = { controlProbe.foreach(_ ! message) - controlSubject.sendControl(InboundEnvelope(OptionVal.None, remoteAddress, message, OptionVal.None, localAddress.uid, + controlSubject.sendControl(InboundEnvelope(OptionVal.None, message, OptionVal.None, localAddress.uid, OptionVal.None)) } diff --git a/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala index 2d247067b9..bdf5333632 100644 --- a/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala @@ -22,7 +22,7 @@ class ArteryMessageSerializerSpec extends AkkaSpec { "Quarantined" → Quarantined(uniqueAddress(), uniqueAddress()), "ActorSystemTerminating" → ActorSystemTerminating(uniqueAddress()), "ActorSystemTerminatingAck" → ActorSystemTerminatingAck(uniqueAddress()), - "HandshakeReq" → HandshakeReq(uniqueAddress()), + "HandshakeReq" → HandshakeReq(uniqueAddress(), uniqueAddress().address), "HandshakeRsp" → HandshakeRsp(uniqueAddress()), "ActorRefCompressionAdvertisement" → ActorRefCompressionAdvertisement(uniqueAddress(), CompressionTable(17L, 123, Map(actorA → 123, actorB → 456, system.deadLetters → 0))), "ActorRefCompressionAdvertisementAck" → ActorRefCompressionAdvertisementAck(uniqueAddress(), 23),