diff --git a/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java b/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java
index eb176e91ba..22deb5c3cd 100644
--- a/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java
+++ b/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java
@@ -764,7 +764,6 @@ public final class ArteryControlFormats {
* address field. A message that needs to changed later can be cloned from this one and then adapted.
* ActorSystemTerminating
* ActorSystemTerminating.Ack
- * OutboundHandshake.HandshakeReq
* OutboundHandshake.HandshakeRsp
*
*/
@@ -1016,7 +1015,6 @@ public final class ArteryControlFormats {
* address field. A message that needs to changed later can be cloned from this one and then adapted.
* ActorSystemTerminating
* ActorSystemTerminating.Ack
- * OutboundHandshake.HandshakeReq
* OutboundHandshake.HandshakeRsp
*
*/
@@ -1280,6 +1278,737 @@ public final class ArteryControlFormats {
// @@protoc_insertion_point(class_scope:MessageWithAddress)
}
+ public interface HandshakeReqOrBuilder
+ extends akka.protobuf.MessageOrBuilder {
+
+ // required .UniqueAddress from = 1;
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ boolean hasFrom();
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ akka.remote.ArteryControlFormats.UniqueAddress getFrom();
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ akka.remote.ArteryControlFormats.UniqueAddressOrBuilder getFromOrBuilder();
+
+ // required .Address to = 2;
+ /**
+ * required .Address to = 2;
+ */
+ boolean hasTo();
+ /**
+ * required .Address to = 2;
+ */
+ akka.remote.ArteryControlFormats.Address getTo();
+ /**
+ * required .Address to = 2;
+ */
+ akka.remote.ArteryControlFormats.AddressOrBuilder getToOrBuilder();
+ }
+ /**
+ * Protobuf type {@code HandshakeReq}
+ */
+ public static final class HandshakeReq extends
+ akka.protobuf.GeneratedMessage
+ implements HandshakeReqOrBuilder {
+ // Use HandshakeReq.newBuilder() to construct.
+ private HandshakeReq(akka.protobuf.GeneratedMessage.Builder> builder) {
+ super(builder);
+ this.unknownFields = builder.getUnknownFields();
+ }
+ private HandshakeReq(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+ private static final HandshakeReq defaultInstance;
+ public static HandshakeReq getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public HandshakeReq getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ private final akka.protobuf.UnknownFieldSet unknownFields;
+ @java.lang.Override
+ public final akka.protobuf.UnknownFieldSet
+ getUnknownFields() {
+ return this.unknownFields;
+ }
+ private HandshakeReq(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ initFields();
+ int mutable_bitField0_ = 0;
+ akka.protobuf.UnknownFieldSet.Builder unknownFields =
+ akka.protobuf.UnknownFieldSet.newBuilder();
+ try {
+ boolean done = false;
+ while (!done) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ done = true;
+ break;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ done = true;
+ }
+ break;
+ }
+ case 10: {
+ akka.remote.ArteryControlFormats.UniqueAddress.Builder subBuilder = null;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ subBuilder = from_.toBuilder();
+ }
+ from_ = input.readMessage(akka.remote.ArteryControlFormats.UniqueAddress.PARSER, extensionRegistry);
+ if (subBuilder != null) {
+ subBuilder.mergeFrom(from_);
+ from_ = subBuilder.buildPartial();
+ }
+ bitField0_ |= 0x00000001;
+ break;
+ }
+ case 18: {
+ akka.remote.ArteryControlFormats.Address.Builder subBuilder = null;
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ subBuilder = to_.toBuilder();
+ }
+ to_ = input.readMessage(akka.remote.ArteryControlFormats.Address.PARSER, extensionRegistry);
+ if (subBuilder != null) {
+ subBuilder.mergeFrom(to_);
+ to_ = subBuilder.buildPartial();
+ }
+ bitField0_ |= 0x00000002;
+ break;
+ }
+ }
+ }
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ throw e.setUnfinishedMessage(this);
+ } catch (java.io.IOException e) {
+ throw new akka.protobuf.InvalidProtocolBufferException(
+ e.getMessage()).setUnfinishedMessage(this);
+ } finally {
+ this.unknownFields = unknownFields.build();
+ makeExtensionsImmutable();
+ }
+ }
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.remote.ArteryControlFormats.internal_static_HandshakeReq_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.remote.ArteryControlFormats.internal_static_HandshakeReq_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.remote.ArteryControlFormats.HandshakeReq.class, akka.remote.ArteryControlFormats.HandshakeReq.Builder.class);
+ }
+
+ public static akka.protobuf.Parser PARSER =
+ new akka.protobuf.AbstractParser() {
+ public HandshakeReq parsePartialFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return new HandshakeReq(input, extensionRegistry);
+ }
+ };
+
+ @java.lang.Override
+ public akka.protobuf.Parser getParserForType() {
+ return PARSER;
+ }
+
+ private int bitField0_;
+ // required .UniqueAddress from = 1;
+ public static final int FROM_FIELD_NUMBER = 1;
+ private akka.remote.ArteryControlFormats.UniqueAddress from_;
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ public boolean hasFrom() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ public akka.remote.ArteryControlFormats.UniqueAddress getFrom() {
+ return from_;
+ }
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ public akka.remote.ArteryControlFormats.UniqueAddressOrBuilder getFromOrBuilder() {
+ return from_;
+ }
+
+ // required .Address to = 2;
+ public static final int TO_FIELD_NUMBER = 2;
+ private akka.remote.ArteryControlFormats.Address to_;
+ /**
+ * required .Address to = 2;
+ */
+ public boolean hasTo() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * required .Address to = 2;
+ */
+ public akka.remote.ArteryControlFormats.Address getTo() {
+ return to_;
+ }
+ /**
+ * required .Address to = 2;
+ */
+ public akka.remote.ArteryControlFormats.AddressOrBuilder getToOrBuilder() {
+ return to_;
+ }
+
+ private void initFields() {
+ from_ = akka.remote.ArteryControlFormats.UniqueAddress.getDefaultInstance();
+ to_ = akka.remote.ArteryControlFormats.Address.getDefaultInstance();
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasFrom()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!hasTo()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!getFrom().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ if (!getTo().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(akka.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeMessage(1, from_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeMessage(2, to_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeMessageSize(1, from_);
+ }
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeMessageSize(2, to_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom(
+ akka.protobuf.ByteString data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom(
+ akka.protobuf.ByteString data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom(byte[] data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom(
+ byte[] data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static akka.remote.ArteryControlFormats.HandshakeReq parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static akka.remote.ArteryControlFormats.HandshakeReq parseDelimitedFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom(
+ akka.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.remote.ArteryControlFormats.HandshakeReq parseFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(akka.remote.ArteryControlFormats.HandshakeReq prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code HandshakeReq}
+ */
+ public static final class Builder extends
+ akka.protobuf.GeneratedMessage.Builder
+ implements akka.remote.ArteryControlFormats.HandshakeReqOrBuilder {
+ public static final akka.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return akka.remote.ArteryControlFormats.internal_static_HandshakeReq_descriptor;
+ }
+
+ protected akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return akka.remote.ArteryControlFormats.internal_static_HandshakeReq_fieldAccessorTable
+ .ensureFieldAccessorsInitialized(
+ akka.remote.ArteryControlFormats.HandshakeReq.class, akka.remote.ArteryControlFormats.HandshakeReq.Builder.class);
+ }
+
+ // Construct using akka.remote.ArteryControlFormats.HandshakeReq.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ getFromFieldBuilder();
+ getToFieldBuilder();
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ if (fromBuilder_ == null) {
+ from_ = akka.remote.ArteryControlFormats.UniqueAddress.getDefaultInstance();
+ } else {
+ fromBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000001);
+ if (toBuilder_ == null) {
+ to_ = akka.remote.ArteryControlFormats.Address.getDefaultInstance();
+ } else {
+ toBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public akka.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return akka.remote.ArteryControlFormats.internal_static_HandshakeReq_descriptor;
+ }
+
+ public akka.remote.ArteryControlFormats.HandshakeReq getDefaultInstanceForType() {
+ return akka.remote.ArteryControlFormats.HandshakeReq.getDefaultInstance();
+ }
+
+ public akka.remote.ArteryControlFormats.HandshakeReq build() {
+ akka.remote.ArteryControlFormats.HandshakeReq result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ public akka.remote.ArteryControlFormats.HandshakeReq buildPartial() {
+ akka.remote.ArteryControlFormats.HandshakeReq result = new akka.remote.ArteryControlFormats.HandshakeReq(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ if (fromBuilder_ == null) {
+ result.from_ = from_;
+ } else {
+ result.from_ = fromBuilder_.build();
+ }
+ if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ if (toBuilder_ == null) {
+ result.to_ = to_;
+ } else {
+ result.to_ = toBuilder_.build();
+ }
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(akka.protobuf.Message other) {
+ if (other instanceof akka.remote.ArteryControlFormats.HandshakeReq) {
+ return mergeFrom((akka.remote.ArteryControlFormats.HandshakeReq)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(akka.remote.ArteryControlFormats.HandshakeReq other) {
+ if (other == akka.remote.ArteryControlFormats.HandshakeReq.getDefaultInstance()) return this;
+ if (other.hasFrom()) {
+ mergeFrom(other.getFrom());
+ }
+ if (other.hasTo()) {
+ mergeTo(other.getTo());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasFrom()) {
+
+ return false;
+ }
+ if (!hasTo()) {
+
+ return false;
+ }
+ if (!getFrom().isInitialized()) {
+
+ return false;
+ }
+ if (!getTo().isInitialized()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ akka.remote.ArteryControlFormats.HandshakeReq parsedMessage = null;
+ try {
+ parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+ } catch (akka.protobuf.InvalidProtocolBufferException e) {
+ parsedMessage = (akka.remote.ArteryControlFormats.HandshakeReq) e.getUnfinishedMessage();
+ throw e;
+ } finally {
+ if (parsedMessage != null) {
+ mergeFrom(parsedMessage);
+ }
+ }
+ return this;
+ }
+ private int bitField0_;
+
+ // required .UniqueAddress from = 1;
+ private akka.remote.ArteryControlFormats.UniqueAddress from_ = akka.remote.ArteryControlFormats.UniqueAddress.getDefaultInstance();
+ private akka.protobuf.SingleFieldBuilder<
+ akka.remote.ArteryControlFormats.UniqueAddress, akka.remote.ArteryControlFormats.UniqueAddress.Builder, akka.remote.ArteryControlFormats.UniqueAddressOrBuilder> fromBuilder_;
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ public boolean hasFrom() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ public akka.remote.ArteryControlFormats.UniqueAddress getFrom() {
+ if (fromBuilder_ == null) {
+ return from_;
+ } else {
+ return fromBuilder_.getMessage();
+ }
+ }
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ public Builder setFrom(akka.remote.ArteryControlFormats.UniqueAddress value) {
+ if (fromBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ from_ = value;
+ onChanged();
+ } else {
+ fromBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ public Builder setFrom(
+ akka.remote.ArteryControlFormats.UniqueAddress.Builder builderForValue) {
+ if (fromBuilder_ == null) {
+ from_ = builderForValue.build();
+ onChanged();
+ } else {
+ fromBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ public Builder mergeFrom(akka.remote.ArteryControlFormats.UniqueAddress value) {
+ if (fromBuilder_ == null) {
+ if (((bitField0_ & 0x00000001) == 0x00000001) &&
+ from_ != akka.remote.ArteryControlFormats.UniqueAddress.getDefaultInstance()) {
+ from_ =
+ akka.remote.ArteryControlFormats.UniqueAddress.newBuilder(from_).mergeFrom(value).buildPartial();
+ } else {
+ from_ = value;
+ }
+ onChanged();
+ } else {
+ fromBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000001;
+ return this;
+ }
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ public Builder clearFrom() {
+ if (fromBuilder_ == null) {
+ from_ = akka.remote.ArteryControlFormats.UniqueAddress.getDefaultInstance();
+ onChanged();
+ } else {
+ fromBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ public akka.remote.ArteryControlFormats.UniqueAddress.Builder getFromBuilder() {
+ bitField0_ |= 0x00000001;
+ onChanged();
+ return getFromFieldBuilder().getBuilder();
+ }
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ public akka.remote.ArteryControlFormats.UniqueAddressOrBuilder getFromOrBuilder() {
+ if (fromBuilder_ != null) {
+ return fromBuilder_.getMessageOrBuilder();
+ } else {
+ return from_;
+ }
+ }
+ /**
+ * required .UniqueAddress from = 1;
+ */
+ private akka.protobuf.SingleFieldBuilder<
+ akka.remote.ArteryControlFormats.UniqueAddress, akka.remote.ArteryControlFormats.UniqueAddress.Builder, akka.remote.ArteryControlFormats.UniqueAddressOrBuilder>
+ getFromFieldBuilder() {
+ if (fromBuilder_ == null) {
+ fromBuilder_ = new akka.protobuf.SingleFieldBuilder<
+ akka.remote.ArteryControlFormats.UniqueAddress, akka.remote.ArteryControlFormats.UniqueAddress.Builder, akka.remote.ArteryControlFormats.UniqueAddressOrBuilder>(
+ from_,
+ getParentForChildren(),
+ isClean());
+ from_ = null;
+ }
+ return fromBuilder_;
+ }
+
+ // required .Address to = 2;
+ private akka.remote.ArteryControlFormats.Address to_ = akka.remote.ArteryControlFormats.Address.getDefaultInstance();
+ private akka.protobuf.SingleFieldBuilder<
+ akka.remote.ArteryControlFormats.Address, akka.remote.ArteryControlFormats.Address.Builder, akka.remote.ArteryControlFormats.AddressOrBuilder> toBuilder_;
+ /**
+ * required .Address to = 2;
+ */
+ public boolean hasTo() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * required .Address to = 2;
+ */
+ public akka.remote.ArteryControlFormats.Address getTo() {
+ if (toBuilder_ == null) {
+ return to_;
+ } else {
+ return toBuilder_.getMessage();
+ }
+ }
+ /**
+ * required .Address to = 2;
+ */
+ public Builder setTo(akka.remote.ArteryControlFormats.Address value) {
+ if (toBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ to_ = value;
+ onChanged();
+ } else {
+ toBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ /**
+ * required .Address to = 2;
+ */
+ public Builder setTo(
+ akka.remote.ArteryControlFormats.Address.Builder builderForValue) {
+ if (toBuilder_ == null) {
+ to_ = builderForValue.build();
+ onChanged();
+ } else {
+ toBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ /**
+ * required .Address to = 2;
+ */
+ public Builder mergeTo(akka.remote.ArteryControlFormats.Address value) {
+ if (toBuilder_ == null) {
+ if (((bitField0_ & 0x00000002) == 0x00000002) &&
+ to_ != akka.remote.ArteryControlFormats.Address.getDefaultInstance()) {
+ to_ =
+ akka.remote.ArteryControlFormats.Address.newBuilder(to_).mergeFrom(value).buildPartial();
+ } else {
+ to_ = value;
+ }
+ onChanged();
+ } else {
+ toBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00000002;
+ return this;
+ }
+ /**
+ * required .Address to = 2;
+ */
+ public Builder clearTo() {
+ if (toBuilder_ == null) {
+ to_ = akka.remote.ArteryControlFormats.Address.getDefaultInstance();
+ onChanged();
+ } else {
+ toBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00000002);
+ return this;
+ }
+ /**
+ * required .Address to = 2;
+ */
+ public akka.remote.ArteryControlFormats.Address.Builder getToBuilder() {
+ bitField0_ |= 0x00000002;
+ onChanged();
+ return getToFieldBuilder().getBuilder();
+ }
+ /**
+ * required .Address to = 2;
+ */
+ public akka.remote.ArteryControlFormats.AddressOrBuilder getToOrBuilder() {
+ if (toBuilder_ != null) {
+ return toBuilder_.getMessageOrBuilder();
+ } else {
+ return to_;
+ }
+ }
+ /**
+ * required .Address to = 2;
+ */
+ private akka.protobuf.SingleFieldBuilder<
+ akka.remote.ArteryControlFormats.Address, akka.remote.ArteryControlFormats.Address.Builder, akka.remote.ArteryControlFormats.AddressOrBuilder>
+ getToFieldBuilder() {
+ if (toBuilder_ == null) {
+ toBuilder_ = new akka.protobuf.SingleFieldBuilder<
+ akka.remote.ArteryControlFormats.Address, akka.remote.ArteryControlFormats.Address.Builder, akka.remote.ArteryControlFormats.AddressOrBuilder>(
+ to_,
+ getParentForChildren(),
+ isClean());
+ to_ = null;
+ }
+ return toBuilder_;
+ }
+
+ // @@protoc_insertion_point(builder_scope:HandshakeReq)
+ }
+
+ static {
+ defaultInstance = new HandshakeReq(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:HandshakeReq)
+ }
+
public interface CompressionTableAdvertisementOrBuilder
extends akka.protobuf.MessageOrBuilder {
@@ -6102,6 +6831,11 @@ public final class ArteryControlFormats {
private static
akka.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_MessageWithAddress_fieldAccessorTable;
+ private static akka.protobuf.Descriptors.Descriptor
+ internal_static_HandshakeReq_descriptor;
+ private static
+ akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_HandshakeReq_fieldAccessorTable;
private static akka.protobuf.Descriptors.Descriptor
internal_static_CompressionTableAdvertisement_descriptor;
private static
@@ -6144,22 +6878,24 @@ public final class ArteryControlFormats {
"\n\032ArteryControlFormats.proto\"G\n\013Quaranti" +
"ned\022\034\n\004from\030\001 \002(\0132\016.UniqueAddress\022\032\n\002to\030" +
"\002 \002(\0132\016.UniqueAddress\"5\n\022MessageWithAddr" +
- "ess\022\037\n\007address\030\001 \002(\0132\016.UniqueAddress\"\204\001\n" +
- "\035CompressionTableAdvertisement\022\034\n\004from\030\001" +
- " \002(\0132\016.UniqueAddress\022\021\n\toriginUid\030\002 \002(\004\022" +
- "\024\n\014tableVersion\030\003 \002(\r\022\014\n\004keys\030\004 \003(\t\022\016\n\006v" +
- "alues\030\005 \003(\r\"Q\n CompressionTableAdvertise" +
- "mentAck\022\034\n\004from\030\001 \002(\0132\016.UniqueAddress\022\017\n" +
- "\007version\030\002 \002(\r\"\212\001\n\025SystemMessageEnvelope",
- "\022\017\n\007message\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022" +
- "\027\n\017messageManifest\030\003 \001(\014\022\r\n\005seqNo\030\004 \002(\004\022" +
- "\"\n\nackReplyTo\030\005 \002(\0132\016.UniqueAddress\"G\n\030S" +
- "ystemMessageDeliveryAck\022\r\n\005seqNo\030\001 \002(\004\022\034" +
- "\n\004from\030\002 \002(\0132\016.UniqueAddress\"K\n\007Address\022" +
- "\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\020\n\010hos" +
- "tname\030\003 \002(\t\022\014\n\004port\030\004 \002(\r\"7\n\rUniqueAddre" +
- "ss\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002 \002" +
- "(\004B\017\n\013akka.remoteH\001"
+ "ess\022\037\n\007address\030\001 \002(\0132\016.UniqueAddress\"B\n\014" +
+ "HandshakeReq\022\034\n\004from\030\001 \002(\0132\016.UniqueAddre" +
+ "ss\022\024\n\002to\030\002 \002(\0132\010.Address\"\204\001\n\035Compression" +
+ "TableAdvertisement\022\034\n\004from\030\001 \002(\0132\016.Uniqu" +
+ "eAddress\022\021\n\toriginUid\030\002 \002(\004\022\024\n\014tableVers" +
+ "ion\030\003 \002(\r\022\014\n\004keys\030\004 \003(\t\022\016\n\006values\030\005 \003(\r\"" +
+ "Q\n CompressionTableAdvertisementAck\022\034\n\004f",
+ "rom\030\001 \002(\0132\016.UniqueAddress\022\017\n\007version\030\002 \002" +
+ "(\r\"\212\001\n\025SystemMessageEnvelope\022\017\n\007message\030" +
+ "\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messageMa" +
+ "nifest\030\003 \001(\014\022\r\n\005seqNo\030\004 \002(\004\022\"\n\nackReplyT" +
+ "o\030\005 \002(\0132\016.UniqueAddress\"G\n\030SystemMessage" +
+ "DeliveryAck\022\r\n\005seqNo\030\001 \002(\004\022\034\n\004from\030\002 \002(\013" +
+ "2\016.UniqueAddress\"K\n\007Address\022\020\n\010protocol\030" +
+ "\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022" +
+ "\014\n\004port\030\004 \002(\r\"7\n\rUniqueAddress\022\031\n\007addres" +
+ "s\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002 \002(\004B\017\n\013akka.r",
+ "emoteH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6178,38 +6914,44 @@ public final class ArteryControlFormats {
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MessageWithAddress_descriptor,
new java.lang.String[] { "Address", });
- internal_static_CompressionTableAdvertisement_descriptor =
+ internal_static_HandshakeReq_descriptor =
getDescriptor().getMessageTypes().get(2);
+ internal_static_HandshakeReq_fieldAccessorTable = new
+ akka.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_HandshakeReq_descriptor,
+ new java.lang.String[] { "From", "To", });
+ internal_static_CompressionTableAdvertisement_descriptor =
+ getDescriptor().getMessageTypes().get(3);
internal_static_CompressionTableAdvertisement_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CompressionTableAdvertisement_descriptor,
new java.lang.String[] { "From", "OriginUid", "TableVersion", "Keys", "Values", });
internal_static_CompressionTableAdvertisementAck_descriptor =
- getDescriptor().getMessageTypes().get(3);
+ getDescriptor().getMessageTypes().get(4);
internal_static_CompressionTableAdvertisementAck_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_CompressionTableAdvertisementAck_descriptor,
new java.lang.String[] { "From", "Version", });
internal_static_SystemMessageEnvelope_descriptor =
- getDescriptor().getMessageTypes().get(4);
+ getDescriptor().getMessageTypes().get(5);
internal_static_SystemMessageEnvelope_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SystemMessageEnvelope_descriptor,
new java.lang.String[] { "Message", "SerializerId", "MessageManifest", "SeqNo", "AckReplyTo", });
internal_static_SystemMessageDeliveryAck_descriptor =
- getDescriptor().getMessageTypes().get(5);
+ getDescriptor().getMessageTypes().get(6);
internal_static_SystemMessageDeliveryAck_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SystemMessageDeliveryAck_descriptor,
new java.lang.String[] { "SeqNo", "From", });
internal_static_Address_descriptor =
- getDescriptor().getMessageTypes().get(6);
+ getDescriptor().getMessageTypes().get(7);
internal_static_Address_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Address_descriptor,
new java.lang.String[] { "Protocol", "System", "Hostname", "Port", });
internal_static_UniqueAddress_descriptor =
- getDescriptor().getMessageTypes().get(7);
+ getDescriptor().getMessageTypes().get(8);
internal_static_UniqueAddress_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_UniqueAddress_descriptor,
diff --git a/akka-remote/src/main/protobuf/ArteryControlFormats.proto b/akka-remote/src/main/protobuf/ArteryControlFormats.proto
index aafff98016..d71d66fe63 100644
--- a/akka-remote/src/main/protobuf/ArteryControlFormats.proto
+++ b/akka-remote/src/main/protobuf/ArteryControlFormats.proto
@@ -14,12 +14,16 @@ message Quarantined {
// address field. A message that needs to changed later can be cloned from this one and then adapted.
// ActorSystemTerminating
// ActorSystemTerminating.Ack
-// OutboundHandshake.HandshakeReq
// OutboundHandshake.HandshakeRsp
message MessageWithAddress {
required UniqueAddress address = 1;
}
+message HandshakeReq {
+ required UniqueAddress from = 1;
+ required Address to = 2;
+}
+
// CompressionProtocol.ActorRefCompressionAdvertisement
// CompressionProtocol.ClassManifestCompressionAdvertisement
message CompressionTableAdvertisement {
diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala
index 58474c3be3..a470e5ae89 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala
@@ -11,7 +11,7 @@ import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
-import akka.util.{ ByteString, OptionVal, PrettyByteString }
+import akka.util.{ ByteString, OptionVal }
import akka.actor.EmptyLocalActorRef
import akka.remote.artery.compress.InboundCompressions
import akka.stream.stage.TimerGraphStageLogic
@@ -24,8 +24,6 @@ import akka.stream.stage.GraphStageWithMaterializedValue
import scala.concurrent.Promise
-import scala.annotation.switch
-
/**
* INTERNAL API
*/
@@ -378,7 +376,6 @@ private[remote] class Decoder(
val decoded = inEnvelopePool.acquire().init(
recipient,
- localAddress, // FIXME: this is used for the "non-local recipient" check in MessageDispatcher. Is this needed anymore?
sender,
originUid,
headerBuilder.serializer,
@@ -405,8 +402,9 @@ private[remote] class Decoder(
scheduleOnce(RetryResolveRemoteDeployedRecipient(
retryResolveRemoteDeployedRecipientAttempts,
recipientActorRefPath, decoded), retryResolveRemoteDeployedRecipientInterval)
- } else
+ } else {
push(out, decoded)
+ }
}
}
@@ -537,4 +535,3 @@ private[remote] class Deserializer(
setHandlers(in, out, this)
}
}
-
diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala
index 801547d87f..3448c26e78 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala
@@ -20,6 +20,7 @@ import akka.stream.stage.TimerGraphStageLogic
import akka.util.OptionVal
import akka.Done
import scala.concurrent.Future
+import akka.actor.Address
/**
* INTERNAL API
@@ -32,7 +33,7 @@ private[akka] object OutboundHandshake {
*/
class HandshakeTimeoutException(msg: String) extends RuntimeException(msg) with NoStackTrace
- final case class HandshakeReq(from: UniqueAddress) extends ControlMessage
+ final case class HandshakeReq(from: UniqueAddress, to: Address) extends ControlMessage
final case class HandshakeRsp(from: UniqueAddress) extends Reply
private sealed trait HandshakeState
@@ -130,7 +131,7 @@ private[akka] class OutboundHandshake(
injectHandshakeTickScheduled = true
scheduleOnce(InjectHandshakeTick, injectHandshakeInterval)
val env: OutboundEnvelope = outboundEnvelopePool.acquire().init(
- recipient = OptionVal.None, message = HandshakeReq(outboundContext.localAddress), sender = OptionVal.None)
+ recipient = OptionVal.None, message = HandshakeReq(outboundContext.localAddress, outboundContext.remoteAddress), sender = OptionVal.None)
push(out, env)
}
@@ -176,7 +177,7 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt
override def onPush(): Unit = {
val env = grab(in)
env.message match {
- case HandshakeReq(from) ⇒ onHandshakeReq(from)
+ case HandshakeReq(from, to) ⇒ onHandshakeReq(from, to)
case HandshakeRsp(from) ⇒
after(inboundContext.completeHandshake(from)) {
pull(in)
@@ -191,16 +192,28 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt
override def onPush(): Unit = {
val env = grab(in)
env.message match {
- case HandshakeReq(from) ⇒ onHandshakeReq(from)
+ case HandshakeReq(from, to) ⇒ onHandshakeReq(from, to)
case _ ⇒
onMessage(env)
}
}
})
- private def onHandshakeReq(from: UniqueAddress): Unit = {
- after(inboundContext.completeHandshake(from)) {
- inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress))
+ private def onHandshakeReq(from: UniqueAddress, to: Address): Unit = {
+ if (to == inboundContext.localAddress.address) {
+ after(inboundContext.completeHandshake(from)) {
+ inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress))
+ pull(in)
+ }
+ } else {
+ log.warning(
+ "Dropping Handshake Request from [{}] addressed to unknown local address [{}]. " +
+ "Local address is [{}]. Check that the sending system uses the same " +
+ "address to contact recipient system as defined in the " +
+ "'akka.remote.artery.canonical.hostname' of the recipient system. " +
+ "The name of the ActorSystem must also match.",
+ from, to, inboundContext.localAddress.address)
+
pull(in)
}
}
diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala
index c017d73107..b9d17362ae 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/InboundEnvelope.scala
@@ -16,14 +16,13 @@ private[remote] object InboundEnvelope {
* Only used in tests
*/
def apply(
- recipient: OptionVal[InternalActorRef],
- recipientAddress: Address,
- message: AnyRef,
- sender: OptionVal[ActorRef],
- originUid: Long,
- association: OptionVal[OutboundContext]): InboundEnvelope = {
+ recipient: OptionVal[InternalActorRef],
+ message: AnyRef,
+ sender: OptionVal[ActorRef],
+ originUid: Long,
+ association: OptionVal[OutboundContext]): InboundEnvelope = {
val env = new ReusableInboundEnvelope
- env.init(recipient, recipientAddress, sender, originUid, -1, "", 0, null, association)
+ env.init(recipient, sender, originUid, -1, "", 0, null, association)
.withMessage(message)
}
@@ -116,15 +115,14 @@ private[akka] final class ReusableInboundEnvelope extends InboundEnvelope {
}
def init(
- recipient: OptionVal[InternalActorRef],
- recipientAddress: Address,
- sender: OptionVal[ActorRef],
- originUid: Long,
- serializer: Int,
- classManifest: String,
- flags: Byte,
- envelopeBuffer: EnvelopeBuffer,
- association: OptionVal[OutboundContext]): InboundEnvelope = {
+ recipient: OptionVal[InternalActorRef],
+ sender: OptionVal[ActorRef],
+ originUid: Long,
+ serializer: Int,
+ classManifest: String,
+ flags: Byte,
+ envelopeBuffer: EnvelopeBuffer,
+ association: OptionVal[OutboundContext]): InboundEnvelope = {
_recipient = recipient
_recipientAddress = recipientAddress
_sender = sender
diff --git a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala
index 7525473479..b5dc011a6e 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala
@@ -71,13 +71,8 @@ private[akka] class MessageDispatcher(
case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒
if (LogReceive) log.debug("received remote-destined message {}", msgLog)
- if (provider.transport.addresses(recipientAddress))
- // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
- r.!(message)(sender)
- else
- log.error(
- "dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]",
- message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
+ // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
+ r.!(message)(sender)
case r ⇒ log.error(
"dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]",
diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala
index 3e093fa5f8..1a6eed90d7 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala
@@ -24,8 +24,6 @@ import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
import akka.remote.artery.OutboundHandshake.HandshakeReq
import akka.actor.ActorRef
-import akka.remote.PriorityMessage
-import akka.actor.ActorSelectionMessage
import akka.dispatch.sysmsg.SystemMessage
import scala.util.control.NoStackTrace
diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala
index 30499f86a4..ee2694a3ac 100644
--- a/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala
+++ b/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala
@@ -56,7 +56,7 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
override def toBinary(o: AnyRef): Array[Byte] = (o match { // most frequent ones first
case env: SystemMessageDelivery.SystemMessageEnvelope ⇒ serializeSystemMessageEnvelope(env)
case SystemMessageDelivery.Ack(seqNo, from) ⇒ serializeSystemMessageDeliveryAck(seqNo, from)
- case HandshakeReq(from) ⇒ serializeWithAddress(from)
+ case HandshakeReq(from, to) ⇒ serializeHandshakeReq(from, to)
case HandshakeRsp(from) ⇒ serializeWithAddress(from)
case SystemMessageDelivery.Nack(seqNo, from) ⇒ serializeSystemMessageDeliveryAck(seqNo, from)
case q: Quarantined ⇒ serializeQuarantined(q)
@@ -71,7 +71,7 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { // most frequent ones first (could be made a HashMap in the future)
case SystemMessageEnvelopeManifest ⇒ deserializeSystemMessageEnvelope(bytes)
case SystemMessageDeliveryAckManifest ⇒ deserializeSystemMessageDeliveryAck(bytes, SystemMessageDelivery.Ack)
- case HandshakeReqManifest ⇒ deserializeWithFromAddress(bytes, HandshakeReq)
+ case HandshakeReqManifest ⇒ deserializeHandshakeReq(bytes, HandshakeReq)
case HandshakeRspManifest ⇒ deserializeWithFromAddress(bytes, HandshakeRsp)
case SystemMessageDeliveryNackManifest ⇒ deserializeSystemMessageDeliveryAck(bytes, SystemMessageDelivery.Nack)
case QuarantinedManifest ⇒ deserializeQuarantined(ArteryControlFormats.Quarantined.parseFrom(bytes))
@@ -192,6 +192,17 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
def deserializeWithFromAddress(bytes: Array[Byte], create: UniqueAddress ⇒ AnyRef): AnyRef =
create(deserializeUniqueAddress(ArteryControlFormats.MessageWithAddress.parseFrom(bytes).getAddress))
+ def serializeHandshakeReq(from: UniqueAddress, to: Address): MessageLite =
+ ArteryControlFormats.HandshakeReq.newBuilder
+ .setFrom(serializeUniqueAddress(from))
+ .setTo(serializeAddress(to))
+ .build()
+
+ def deserializeHandshakeReq(bytes: Array[Byte], create: (UniqueAddress, Address) ⇒ HandshakeReq): HandshakeReq = {
+ val protoEnv = ArteryControlFormats.HandshakeReq.parseFrom(bytes)
+ create(deserializeUniqueAddress(protoEnv.getFrom), deserializeAddress(protoEnv.getTo))
+ }
+
def serializeUniqueAddress(address: UniqueAddress): ArteryControlFormats.UniqueAddress =
ArteryControlFormats.UniqueAddress.newBuilder()
.setAddress(serializeAddress(address.address))
diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala
new file mode 100644
index 0000000000..864c0bbe49
--- /dev/null
+++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2016 Lightbend Inc.
+ */
+package akka.remote.artery
+
+import scala.concurrent.duration._
+
+import akka.actor.{ ActorIdentity, ActorSystem, Identify }
+import akka.testkit._
+import com.typesafe.config.ConfigFactory
+import akka.actor.RootActorPath
+
+object HandshakeDenySpec {
+
+ val commonConfig = ConfigFactory.parseString(s"""
+ akka.loglevel = WARNING
+ akka {
+ actor.provider = remote
+ remote.artery.enabled = on
+ remote.artery.canonical.hostname = localhost
+ remote.artery.canonical.port = 0
+ remote.artery.advanced.handshake-timeout = 2s
+ }
+ """)
+
+}
+
+class HandshakeDenySpec extends ArteryMultiNodeSpec(HandshakeDenySpec.commonConfig) with ImplicitSender {
+ import HandshakeDenySpec._
+
+ var systemB = newRemoteSystem(name = Some("systemB"))
+
+ "Artery handshake" must {
+
+ "be denied when originating address is unknown" in {
+ val sel = system.actorSelection(RootActorPath(address(systemB).copy(host = Some("127.0.0.1"))) / "user" / "echo")
+
+ systemB.actorOf(TestActors.echoActorProps, "echo")
+
+ EventFilter.warning(start = "Dropping Handshake Request from").intercept {
+ sel ! Identify(None)
+ expectNoMsg(3.seconds)
+ }(systemB)
+ }
+
+ }
+
+}
diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala
index 3e8feadf59..2ce40262e0 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeRetrySpec.scala
@@ -22,8 +22,7 @@ object HandshakeRetrySpec {
remote.artery.enabled = on
remote.artery.canonical.hostname = localhost
remote.artery.canonical.port = 0
- remote.handshake-timeout = 10s
-
+ remote.artery.advanced.handshake-timeout = 10s
}
""")
diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala
index 44e3a26692..a6030bdffc 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala
@@ -3,12 +3,9 @@
*/
package akka.remote.artery
-import scala.concurrent.duration._
import akka.actor.Address
-import akka.actor.InternalActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.InboundControlJunction.ControlMessageObserver
-import akka.remote.artery.SystemMessageDelivery._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Keep
@@ -49,14 +46,14 @@ class InboundControlJunctionSpec
val recipient = OptionVal.None // not used
val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef]
- .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid, OptionVal.None))
+ .map(msg ⇒ InboundEnvelope(recipient, msg, OptionVal.None, addressA.uid, OptionVal.None))
.viaMat(new InboundControlJunction)(Keep.both)
.map { case env: InboundEnvelope ⇒ env.message }
.toMat(TestSink.probe[Any])(Keep.both)
.run()
controlSubject.attach(new ControlMessageObserver {
- override def notify(env: InboundEnvelope) {
+ override def notify(env: InboundEnvelope) = {
observerProbe.ref ! env.message
}
})
diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala
index befdc927e0..96f1b7d648 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala
@@ -6,11 +6,9 @@ package akka.remote.artery
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.Address
-import akka.actor.InternalActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.OutboundHandshake.HandshakeReq
import akka.remote.artery.OutboundHandshake.HandshakeRsp
-import akka.remote.artery.SystemMessageDelivery._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Keep
@@ -30,7 +28,6 @@ object InboundHandshakeSpec {
}
class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
- import InboundHandshakeSpec._
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings)(system)
@@ -41,7 +38,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = {
val recipient = OptionVal.None // not used
TestSource.probe[AnyRef]
- .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, OptionVal.None, addressA.uid,
+ .map(msg ⇒ InboundEnvelope(recipient, msg, OptionVal.None, addressA.uid,
inboundContext.association(addressA.uid)))
.via(new InboundHandshake(inboundContext, inControlStream = true))
.map { case env: InboundEnvelope ⇒ env.message }
@@ -57,7 +54,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
val (upstream, downstream) = setupStream(inboundContext)
downstream.request(10)
- upstream.sendNext(HandshakeReq(addressA))
+ upstream.sendNext(HandshakeReq(addressA, addressB.address))
upstream.sendNext("msg1")
replyProbe.expectMsg(HandshakeRsp(addressB))
downstream.expectNext("msg1")
@@ -69,7 +66,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
val (upstream, downstream) = setupStream(inboundContext)
downstream.request(10)
- upstream.sendNext(HandshakeReq(addressA))
+ upstream.sendNext(HandshakeReq(addressA, addressB.address))
upstream.sendNext("msg1")
downstream.expectNext("msg1")
val uniqueRemoteAddress = Await.result(
@@ -89,7 +86,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender {
downstream.expectNoMsg(200.millis) // messages from unknown are dropped
// and accept messages after handshake
- upstream.sendNext(HandshakeReq(addressA))
+ upstream.sendNext(HandshakeReq(addressA, addressB.address))
upstream.sendNext("msg18")
replyProbe.expectMsg(HandshakeRsp(addressB))
downstream.expectNext("msg18")
diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala
index 3b044e654c..b5a87f0940 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala
@@ -53,7 +53,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
val (upstream, downstream) = setupStream(outboundContext)
downstream.request(10)
- downstream.expectNext(HandshakeReq(addressA))
+ downstream.expectNext(HandshakeReq(addressA, addressB.address))
downstream.cancel()
}
@@ -65,7 +65,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
upstream.sendNext("msg1")
downstream.request(10)
- downstream.expectNext(HandshakeReq(addressA))
+ downstream.expectNext(HandshakeReq(addressA, addressB.address))
downstream.expectNext("msg1")
downstream.cancel()
}
@@ -76,7 +76,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
val (upstream, downstream) = setupStream(outboundContext, timeout = 200.millis)
downstream.request(1)
- downstream.expectNext(HandshakeReq(addressA))
+ downstream.expectNext(HandshakeReq(addressA, addressB.address))
downstream.expectError().getClass should be(classOf[HandshakeTimeoutException])
}
@@ -86,9 +86,9 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
val (upstream, downstream) = setupStream(outboundContext, retryInterval = 100.millis)
downstream.request(10)
- downstream.expectNext(HandshakeReq(addressA))
- downstream.expectNext(HandshakeReq(addressA))
- downstream.expectNext(HandshakeReq(addressA))
+ downstream.expectNext(HandshakeReq(addressA, addressB.address))
+ downstream.expectNext(HandshakeReq(addressA, addressB.address))
+ downstream.expectNext(HandshakeReq(addressA, addressB.address))
downstream.cancel()
}
@@ -98,7 +98,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
val (upstream, downstream) = setupStream(outboundContext)
downstream.request(10)
- downstream.expectNext(HandshakeReq(addressA))
+ downstream.expectNext(HandshakeReq(addressA, addressB.address))
upstream.sendNext("msg1")
downstream.expectNoMsg(200.millis)
// InboundHandshake stage will complete the handshake when receiving HandshakeRsp
@@ -116,7 +116,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
downstream.request(10)
upstream.sendNext("msg1")
- downstream.expectNext(HandshakeReq(addressA))
+ downstream.expectNext(HandshakeReq(addressA, addressB.address))
inboundContext.completeHandshake(addressB)
downstream.expectNext("msg1")
@@ -124,7 +124,7 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender {
upstream.sendNext("msg2")
upstream.sendNext("msg3")
upstream.sendNext("msg4")
- downstream.expectNext(HandshakeReq(addressA))
+ downstream.expectNext(HandshakeReq(addressA, addressB.address))
downstream.expectNext("msg2")
downstream.expectNext("msg3")
downstream.expectNext("msg4")
diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala
index 5ba36ba0ef..fd0d44fdf1 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageAckerSpec.scala
@@ -34,7 +34,7 @@ class SystemMessageAckerSpec extends AkkaSpec with ImplicitSender {
TestSource.probe[AnyRef]
.map {
case sysMsg @ SystemMessageEnvelope(_, _, ackReplyTo) ⇒
- InboundEnvelope(recipient, addressA.address, sysMsg, OptionVal.None, ackReplyTo.uid,
+ InboundEnvelope(recipient, sysMsg, OptionVal.None, ackReplyTo.uid,
inboundContext.association(ackReplyTo.uid))
}
.via(new SystemMessageAcker(inboundContext))
diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala
index 9264c7d980..cc7ed61608 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala
@@ -10,12 +10,10 @@ import scala.concurrent.duration._
import akka.NotUsed
import akka.actor.ActorIdentity
import akka.actor.ActorSystem
-import akka.actor.ExtendedActorSystem
import akka.actor.Identify
-import akka.actor.InternalActorRef
import akka.actor.PoisonPill
import akka.actor.RootActorPath
-import akka.remote.{ AddressUidExtension, RARP, RemoteActorRef, UniqueAddress }
+import akka.remote.{ AddressUidExtension, RARP, UniqueAddress }
import akka.remote.artery.SystemMessageDelivery._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
@@ -79,7 +77,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
Flow[OutboundEnvelope]
.map(outboundEnvelope ⇒ outboundEnvelope.message match {
case sysEnv: SystemMessageEnvelope ⇒
- InboundEnvelope(recipient, addressB.address, sysEnv, OptionVal.None, addressA.uid,
+ InboundEnvelope(recipient, sysEnv, OptionVal.None, addressA.uid,
inboundContext.association(addressA.uid))
})
.async
diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala
index db5965589d..422211634b 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala
@@ -12,12 +12,10 @@ import scala.util.Success
import akka.Done
import akka.actor.ActorRef
import akka.actor.Address
-import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.InboundControlJunction.ControlMessageObserver
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.util.OptionVal
-import akka.actor.InternalActorRef
import akka.dispatch.ExecutionContexts
import com.typesafe.config.ConfigFactory
@@ -94,7 +92,7 @@ private[remote] class TestOutboundContext(
override def sendControl(message: ControlMessage) = {
controlProbe.foreach(_ ! message)
- controlSubject.sendControl(InboundEnvelope(OptionVal.None, remoteAddress, message, OptionVal.None, localAddress.uid,
+ controlSubject.sendControl(InboundEnvelope(OptionVal.None, message, OptionVal.None, localAddress.uid,
OptionVal.None))
}
diff --git a/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala
index 2d247067b9..bdf5333632 100644
--- a/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala
@@ -22,7 +22,7 @@ class ArteryMessageSerializerSpec extends AkkaSpec {
"Quarantined" → Quarantined(uniqueAddress(), uniqueAddress()),
"ActorSystemTerminating" → ActorSystemTerminating(uniqueAddress()),
"ActorSystemTerminatingAck" → ActorSystemTerminatingAck(uniqueAddress()),
- "HandshakeReq" → HandshakeReq(uniqueAddress()),
+ "HandshakeReq" → HandshakeReq(uniqueAddress(), uniqueAddress().address),
"HandshakeRsp" → HandshakeRsp(uniqueAddress()),
"ActorRefCompressionAdvertisement" → ActorRefCompressionAdvertisement(uniqueAddress(), CompressionTable(17L, 123, Map(actorA → 123, actorB → 456, system.deadLetters → 0))),
"ActorRefCompressionAdvertisementAck" → ActorRefCompressionAdvertisementAck(uniqueAddress(), 23),