diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala
index f0903d4925..6203e053e2 100644
--- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala
+++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala
@@ -64,7 +64,7 @@ class CodecBenchmark {
val uniqueLocalAddress = UniqueAddress(
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress,
- AddressUidExtension(system).addressUid
+ AddressUidExtension(system).longAddressUid
)
val payload = Array.ofDim[Byte](1000)
diff --git a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java
index 9793fa2616..15db086207 100644
--- a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java
+++ b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java
@@ -16041,6 +16041,24 @@ public final class ClusterMessages {
* required uint32 uid = 2;
*/
int getUid();
+
+ // optional uint32 uid2 = 3;
+ /**
+ * optional uint32 uid2 = 3;
+ *
+ *
+ * 64 bit uids but with backward wire compatibility + *+ */ + boolean hasUid2(); + /** + *
optional uint32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + int getUid2(); } /** * Protobuf type {@code UniqueAddress} @@ -16116,6 +16134,11 @@ public final class ClusterMessages { uid_ = input.readUInt32(); break; } + case 24: { + bitField0_ |= 0x00000004; + uid2_ = input.readUInt32(); + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -16194,9 +16217,34 @@ public final class ClusterMessages { return uid_; } + // optional uint32 uid2 = 3; + public static final int UID2_FIELD_NUMBER = 3; + private int uid2_; + /** + *
optional uint32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public boolean hasUid2() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + *
optional uint32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public int getUid2() { + return uid2_; + } + private void initFields() { address_ = akka.cluster.protobuf.msg.ClusterMessages.Address.getDefaultInstance(); uid_ = 0; + uid2_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -16228,6 +16276,9 @@ public final class ClusterMessages { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeUInt32(2, uid_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeUInt32(3, uid2_); + } getUnknownFields().writeTo(output); } @@ -16245,6 +16296,10 @@ public final class ClusterMessages { size += akka.protobuf.CodedOutputStream .computeUInt32Size(2, uid_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += akka.protobuf.CodedOutputStream + .computeUInt32Size(3, uid2_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -16375,6 +16430,8 @@ public final class ClusterMessages { bitField0_ = (bitField0_ & ~0x00000001); uid_ = 0; bitField0_ = (bitField0_ & ~0x00000002); + uid2_ = 0; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -16415,6 +16472,10 @@ public final class ClusterMessages { to_bitField0_ |= 0x00000002; } result.uid_ = uid_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.uid2_ = uid2_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -16437,6 +16498,9 @@ public final class ClusterMessages { if (other.hasUid()) { setUid(other.getUid()); } + if (other.hasUid2()) { + setUid2(other.getUid2()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -16626,6 +16690,55 @@ public final class ClusterMessages { return this; } + // optional uint32 uid2 = 3; + private int uid2_ ; + /** + *
optional uint32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public boolean hasUid2() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + *
optional uint32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public int getUid2() { + return uid2_; + } + /** + *
optional uint32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public Builder setUid2(int value) { + bitField0_ |= 0x00000004; + uid2_ = value; + onChanged(); + return this; + } + /** + *
optional uint32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public Builder clearUid2() { + bitField0_ = (bitField0_ & ~0x00000004); + uid2_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:UniqueAddress) } @@ -16789,14 +16902,14 @@ public final class ClusterMessages { "\016\n\nSerialized\020\000\022\n\n\006Double\020\001\022\t\n\005Float\020\002\022\013" + "\n\007Integer\020\003\022\010\n\004Long\020\004\"\007\n\005Empty\"K\n\007Addres" + "s\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n\004p" + - "ort\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"7\n\rUniqueAdd" + + "ort\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"E\n\rUniqueAdd" + "ress\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002" + - " \002(\r*D\n\022ReachabilityStatus\022\r\n\tReachable\020" + - "\000\022\017\n\013Unreachable\020\001\022\016\n\nTerminated\020\002*b\n\014Me" + - "mberStatus\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leavi" + - "ng\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005" + - "\022\014\n\010WeaklyUp\020\006B\035\n\031akka.cluster.protobuf.", - "msgH\001" + " \002(\r\022\014\n\004uid2\030\003 \001(\r*D\n\022ReachabilityStatus" + + "\022\r\n\tReachable\020\000\022\017\n\013Unreachable\020\001\022\016\n\nTerm" + + "inated\020\002*b\n\014MemberStatus\022\013\n\007Joining\020\000\022\006\n" + + "\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020" + + "\004\022\013\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006B\035\n\031akka.clu", + "ster.protobuf.msgH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -16922,7 +17035,7 @@ public final class ClusterMessages { internal_static_UniqueAddress_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_UniqueAddress_descriptor, - new java.lang.String[] { "Address", "Uid", }); + new java.lang.String[] { "Address", "Uid", "Uid2", }); return null; } }; diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index 8b280d421f..988899c73d 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -252,4 +252,6 @@ message Address { message UniqueAddress { required Address address = 1; required uint32 uid = 2; + // 64 bit uids but with backward wire compatibility + optional uint32 uid2 = 3; } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index d8ec2cd930..582f9b82a9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -67,7 +67,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { */ val selfUniqueAddress: UniqueAddress = system.provider match { case c: ClusterActorRefProvider ⇒ - UniqueAddress(c.transport.defaultAddress, AddressUidExtension(system).addressUid) + UniqueAddress(c.transport.defaultAddress, AddressUidExtension(system).longAddressUid) case other ⇒ throw new ConfigurationException( s"ActorSystem [${system}] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [${other.getClass.getName}]") } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index a65ca8cb24..afb87d0253 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -250,7 +250,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3 val MaxGossipsBeforeShuttingDownMyself = 5 - def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid + def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}" val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress)) // note that self is not initially member, diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 5dd1ed3da8..534d23e273 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -93,7 +93,7 @@ private[cluster] class ClusterRemoteWatcher( // The reason we don't quarantine gracefully removed members (leaving) is that // Cluster Singleton need to exchange TakeOver/HandOver messages. if (previousStatus == MemberStatus.Down) { - quarantine(m.address, Some(m.uniqueAddress.uid), s"Cluster member removed, previous status [$previousStatus]") + quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]") } publishAddressTerminated(m.address) } diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index bd3f817587..37671623d7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -7,6 +7,8 @@ package akka.cluster import akka.actor.Address import MemberStatus._ +import scala.runtime.AbstractFunction2 + /** * Represents the address, current status, and roles of a cluster member node. * @@ -243,18 +245,43 @@ object MemberStatus { Removed → Set.empty[MemberStatus]) } +object UniqueAddress extends AbstractFunction2[Address, Int, UniqueAddress] { + + // for binary compatibility + @deprecated("Use Long UID apply instead", since = "2.4.11") + def apply(address: Address, uid: Int) = new UniqueAddress(address, uid.toLong) + +} + /** * Member identifier consisting of address and random `uid`. * The `uid` is needed to be able to distinguish different * incarnations of a member with same hostname and port. */ @SerialVersionUID(1L) -final case class UniqueAddress(address: Address, uid: Int) extends Ordered[UniqueAddress] { - override def hashCode = uid +final case class UniqueAddress(address: Address, longUid: Long) extends Ordered[UniqueAddress] { + + override def hashCode = java.lang.Long.hashCode(longUid) def compare(that: UniqueAddress): Int = { val result = Member.addressOrdering.compare(this.address, that.address) - if (result == 0) if (this.uid < that.uid) -1 else if (this.uid == that.uid) 0 else 1 + if (result == 0) if (this.longUid < that.longUid) -1 else if (this.longUid == that.longUid) 0 else 1 else result } -} + + // for binary compatibility + + @deprecated("Use Long UID constructor instead", since = "2.4.11") + def this(address: Address, uid: Int) = this(address, uid.toLong) + + @deprecated("Use longUid instead", since = "2.4.11") + def uid = longUid.toInt + + /** + * For binary compatibility + * Stops `copy(Address, Long)` copy from being generated, use `apply` instead. + */ + @deprecated("Use Long UID constructor instead", since = "2.4.11") + def copy(address: Address = address, uid: Int = uid) = new UniqueAddress(address, uid) + +} \ No newline at end of file diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 78921cc0f3..970c8e1a6e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -126,8 +126,12 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri private def addressToProtoByteArray(address: Address): Array[Byte] = addressToProto(address).build.toByteArray - private def uniqueAddressToProto(uniqueAddress: UniqueAddress): cm.UniqueAddress.Builder = - cm.UniqueAddress.newBuilder().setAddress(addressToProto(uniqueAddress.address)).setUid(uniqueAddress.uid) + private def uniqueAddressToProto(uniqueAddress: UniqueAddress): cm.UniqueAddress.Builder = { + cm.UniqueAddress.newBuilder() + .setAddress(addressToProto(uniqueAddress.address)) + .setUid(uniqueAddress.longUid.toInt) + .setUid2((uniqueAddress.longUid >> 32).toInt) + } private def uniqueAddressToProtoByteArray(uniqueAddress: UniqueAddress): Array[Byte] = uniqueAddressToProto(uniqueAddress).build.toByteArray @@ -161,8 +165,19 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri private def addressFromProto(address: cm.Address): Address = Address(getProtocol(address), getSystem(address), address.getHostname, address.getPort) - private def uniqueAddressFromProto(uniqueAddress: cm.UniqueAddress): UniqueAddress = - UniqueAddress(addressFromProto(uniqueAddress.getAddress), uniqueAddress.getUid) + private def uniqueAddressFromProto(uniqueAddress: cm.UniqueAddress): UniqueAddress = { + + UniqueAddress( + addressFromProto(uniqueAddress.getAddress), + if (uniqueAddress.hasUid2) { + // new remote node join the two parts of the long uid back + (uniqueAddress.getUid2.toLong << 32) | (uniqueAddress.getUid & 0xFFFFFFFFL) + } else { + // old remote node + uniqueAddress.getUid.toLong + } + ) + } private val memberStatusToInt = scala.collection.immutable.HashMap[MemberStatus, Int]( MemberStatus.Joining → cm.MemberStatus.Joining_VALUE, diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala index 895b3fb5bf..01cd5f8e00 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala @@ -137,7 +137,7 @@ abstract class RestartNode3Spec awaitAssert { Cluster(system).readView.members.size should ===(3) Cluster(system).readView.members.exists { m ⇒ - m.address == secondUniqueAddress.address && m.uniqueAddress.uid != secondUniqueAddress.uid + m.address == secondUniqueAddress.address && m.uniqueAddress.longUid != secondUniqueAddress.longUid } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala index 17cad3e2b6..e478402dc6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala @@ -141,7 +141,7 @@ abstract class RestartNodeSpec awaitAssert { Cluster(system).readView.members.size should ===(3) Cluster(system).readView.members.exists { m ⇒ - m.address == secondUniqueAddress.address && m.uniqueAddress.uid != secondUniqueAddress.uid + m.address == secondUniqueAddress.address && m.uniqueAddress.longUid != secondUniqueAddress.longUid } } } diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java index 69f9c4a156..ced5725d11 100644 --- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java +++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java @@ -12421,6 +12421,24 @@ public final class ReplicatorMessages { *
required sfixed32 uid = 2;
*/
int getUid();
+
+ // optional sfixed32 uid2 = 3;
+ /**
+ * optional sfixed32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + boolean hasUid2(); + /** + *
optional sfixed32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + int getUid2(); } /** * Protobuf type {@code akka.cluster.ddata.UniqueAddress} @@ -12491,6 +12509,11 @@ public final class ReplicatorMessages { uid_ = input.readSFixed32(); break; } + case 29: { + bitField0_ |= 0x00000004; + uid2_ = input.readSFixed32(); + break; + } } } } catch (akka.protobuf.InvalidProtocolBufferException e) { @@ -12569,9 +12592,34 @@ public final class ReplicatorMessages { return uid_; } + // optional sfixed32 uid2 = 3; + public static final int UID2_FIELD_NUMBER = 3; + private int uid2_; + /** + *
optional sfixed32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public boolean hasUid2() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + *
optional sfixed32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public int getUid2() { + return uid2_; + } + private void initFields() { address_ = akka.cluster.ddata.protobuf.msg.ReplicatorMessages.Address.getDefaultInstance(); uid_ = 0; + uid2_ = 0; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -12603,6 +12651,9 @@ public final class ReplicatorMessages { if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeSFixed32(2, uid_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeSFixed32(3, uid2_); + } getUnknownFields().writeTo(output); } @@ -12620,6 +12671,10 @@ public final class ReplicatorMessages { size += akka.protobuf.CodedOutputStream .computeSFixed32Size(2, uid_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += akka.protobuf.CodedOutputStream + .computeSFixed32Size(3, uid2_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -12745,6 +12800,8 @@ public final class ReplicatorMessages { bitField0_ = (bitField0_ & ~0x00000001); uid_ = 0; bitField0_ = (bitField0_ & ~0x00000002); + uid2_ = 0; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -12785,6 +12842,10 @@ public final class ReplicatorMessages { to_bitField0_ |= 0x00000002; } result.uid_ = uid_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.uid2_ = uid2_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -12807,6 +12868,9 @@ public final class ReplicatorMessages { if (other.hasUid()) { setUid(other.getUid()); } + if (other.hasUid2()) { + setUid2(other.getUid2()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -12996,6 +13060,55 @@ public final class ReplicatorMessages { return this; } + // optional sfixed32 uid2 = 3; + private int uid2_ ; + /** + *
optional sfixed32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public boolean hasUid2() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + *
optional sfixed32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public int getUid2() { + return uid2_; + } + /** + *
optional sfixed32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public Builder setUid2(int value) { + bitField0_ |= 0x00000004; + uid2_ = value; + onChanged(); + return this; + } + /** + *
optional sfixed32 uid2 = 3;
+ *
+ * + * 64 bit uids but with backward wire compatibility + *+ */ + public Builder clearUid2() { + bitField0_ = (bitField0_ & ~0x00000004); + uid2_ = 0; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:akka.cluster.ddata.UniqueAddress) } @@ -14806,14 +14919,14 @@ public final class ReplicatorMessages { " \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.cluster.dda" + "ta.Gossip.Entry\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n" + "\010envelope\030\002 \002(\0132 .akka.cluster.ddata.Dat", - "aEnvelope\"J\n\rUniqueAddress\022,\n\007address\030\001 " + + "aEnvelope\"X\n\rUniqueAddress\022,\n\007address\030\001 " + "\002(\0132\033.akka.cluster.ddata.Address\022\013\n\003uid\030" + - "\002 \002(\017\")\n\007Address\022\020\n\010hostname\030\001 \002(\t\022\014\n\004po" + - "rt\030\002 \002(\r\"V\n\014OtherMessage\022\027\n\017enclosedMess" + - "age\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messa" + - "geManifest\030\004 \001(\014\"\036\n\nStringGSet\022\020\n\010elemen" + - "ts\030\001 \003(\tB#\n\037akka.cluster.ddata.protobuf." + - "msgH\001" + "\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n\007Address\022\020\n\010hostna" + + "me\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"V\n\014OtherMessage\022\027" + + "\n\017enclosedMessage\030\001 \002(\014\022\024\n\014serializerId\030" + + "\002 \002(\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nString" + + "GSet\022\020\n\010elements\030\001 \003(\tB#\n\037akka.cluster.d" + + "data.protobuf.msgH\001" }; akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -14927,7 +15040,7 @@ public final class ReplicatorMessages { internal_static_akka_cluster_ddata_UniqueAddress_fieldAccessorTable = new akka.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_akka_cluster_ddata_UniqueAddress_descriptor, - new java.lang.String[] { "Address", "Uid", }); + new java.lang.String[] { "Address", "Uid", "Uid2", }); internal_static_akka_cluster_ddata_Address_descriptor = getDescriptor().getMessageTypes().get(15); internal_static_akka_cluster_ddata_Address_fieldAccessorTable = new diff --git a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto index c666716e3b..9d3a93b68b 100644 --- a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto +++ b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto @@ -98,6 +98,8 @@ message Gossip { message UniqueAddress { required Address address = 1; required sfixed32 uid = 2; + // 64 bit uids but with backward wire compatibility + optional sfixed32 uid2 = 3; } message Address { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala index 11dab79f11..054ea0ab81 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala @@ -88,10 +88,21 @@ trait SerializationSupport { Address(addressProtocol, system.name, address.getHostname, address.getPort) def uniqueAddressToProto(uniqueAddress: UniqueAddress): dm.UniqueAddress.Builder = - dm.UniqueAddress.newBuilder().setAddress(addressToProto(uniqueAddress.address)).setUid(uniqueAddress.uid) + dm.UniqueAddress.newBuilder().setAddress(addressToProto(uniqueAddress.address)) + .setUid(uniqueAddress.longUid.toInt) + .setUid2((uniqueAddress.longUid >> 32).toInt) def uniqueAddressFromProto(uniqueAddress: dm.UniqueAddress): UniqueAddress = - UniqueAddress(addressFromProto(uniqueAddress.getAddress), uniqueAddress.getUid) + UniqueAddress( + addressFromProto(uniqueAddress.getAddress), + if (uniqueAddress.hasUid2) { + // new remote node join the two parts of the long uid back + (uniqueAddress.getUid2.toLong << 32) | (uniqueAddress.getUid & 0xFFFFFFFFL) + } else { + // old remote node + uniqueAddress.getUid.toLong + } + ) def resolveActorRef(path: String): ActorRef = system.provider.resolveActorRef(path) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala index 12efbba067..2ce5a016ae 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala @@ -32,7 +32,7 @@ class ArteryPiercingShouldKeepQuarantineSpecMultiJvmNode2 extends PiercingShould object PiercingShouldKeepQuarantineSpec { class Subject extends Actor { def receive = { - case "getuid" ⇒ sender() ! AddressUidExtension(context.system).addressUid + case "getuid" ⇒ sender() ! AddressUidExtension(context.system).longAddressUid } } } @@ -52,7 +52,7 @@ abstract class PiercingShouldKeepQuarantineSpec(multiNodeConfig: PiercingShouldK // Communicate with second system system.actorSelection(node(second) / "user" / "subject") ! "getuid" - val uid = expectMsgType[Int](10.seconds) + val uid = expectMsgType[Long](10.seconds) enterBarrier("actor-identified") // Manually Quarantine the other system diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala index 401a3bf104..e4d88b7fe1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala @@ -45,7 +45,7 @@ object RemoteQuarantinePiercingSpec { class Subject extends Actor { def receive = { case "shutdown" ⇒ context.system.terminate() - case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid → self) + case "identify" ⇒ sender() ! (AddressUidExtension(context.system).longAddressUid → self) } } } @@ -57,10 +57,10 @@ abstract class RemoteQuarantinePiercingSpec(multiNodeConfig: RemoteQuarantinePie override def initialParticipants = roles.size - def identifyWithUid(role: RoleName, actorName: String, timeout: FiniteDuration = remainingOrDefault): (Int, ActorRef) = { + def identifyWithUid(role: RoleName, actorName: String, timeout: FiniteDuration = remainingOrDefault): (Long, ActorRef) = { within(timeout) { system.actorSelection(node(role) / "user" / actorName) ! "identify" - expectMsgType[(Int, ActorRef)] + expectMsgType[(Long, ActorRef)] } } @@ -90,7 +90,7 @@ abstract class RemoteQuarantinePiercingSpec(multiNodeConfig: RemoteQuarantinePie // retry because the Subject actor might not be started yet awaitAssert { system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify" - val (uidSecond, subjectSecond) = expectMsgType[(Int, ActorRef)](1.second) + val (uidSecond, subjectSecond) = expectMsgType[(Long, ActorRef)](1.second) uidSecond should not be (uidFirst) subjectSecond should not be (subjectFirst) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala index 6e5ec4d812..a48cf210c6 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala @@ -34,7 +34,7 @@ object HandshakeRestartReceiverSpec extends MultiNodeConfig { class Subject extends Actor { def receive = { case "shutdown" ⇒ context.system.terminate() - case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid → self) + case "identify" ⇒ sender() ! (AddressUidExtension(context.system).longAddressUid → self) } } @@ -55,10 +55,10 @@ abstract class HandshakeRestartReceiverSpec super.afterAll() } - def identifyWithUid(rootPath: ActorPath, actorName: String, timeout: FiniteDuration = remainingOrDefault): (Int, ActorRef) = { + def identifyWithUid(rootPath: ActorPath, actorName: String, timeout: FiniteDuration = remainingOrDefault): (Long, ActorRef) = { within(timeout) { system.actorSelection(rootPath / "user" / actorName) ! "identify" - expectMsgType[(Int, ActorRef)] + expectMsgType[(Long, ActorRef)] } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala index feb28d84a4..34d78ba270 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala @@ -36,7 +36,7 @@ object RemoteRestartedQuarantinedSpec extends MultiNodeConfig { class Subject extends Actor { def receive = { case "shutdown" ⇒ context.system.terminate() - case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid → self) + case "identify" ⇒ sender() ! (AddressUidExtension(context.system).longAddressUid → self) } } @@ -51,10 +51,10 @@ abstract class RemoteRestartedQuarantinedSpec extends RemotingMultiNodeSpec(Remo override def initialParticipants = 2 - def identifyWithUid(role: RoleName, actorName: String, timeout: FiniteDuration = remainingOrDefault): (Int, ActorRef) = { + def identifyWithUid(role: RoleName, actorName: String, timeout: FiniteDuration = remainingOrDefault): (Long, ActorRef) = { within(timeout) { system.actorSelection(node(role) / "user" / actorName) ! "identify" - expectMsgType[(Int, ActorRef)] + expectMsgType[(Long, ActorRef)] } } diff --git a/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java b/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java index 22deb5c3cd..f1ed38b3a9 100644 --- a/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java +++ b/akka-remote/src/main/java/akka/remote/ArteryControlFormats.java @@ -6821,6 +6821,420 @@ public final class ArteryControlFormats { // @@protoc_insertion_point(class_scope:UniqueAddress) } + public interface ArteryHeartbeatRspOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required uint64 uid = 1; + /** + *
required uint64 uid = 1;
+ */
+ boolean hasUid();
+ /**
+ * required uint64 uid = 1;
+ */
+ long getUid();
+ }
+ /**
+ * Protobuf type {@code ArteryHeartbeatRsp}
+ *
+ * + * RemoteWatcher.ArteryHeartbeat is empty array + * RemoteWatcher.ArteryHeartbeatRsp + *+ */ + public static final class ArteryHeartbeatRsp extends + akka.protobuf.GeneratedMessage + implements ArteryHeartbeatRspOrBuilder { + // Use ArteryHeartbeatRsp.newBuilder() to construct. + private ArteryHeartbeatRsp(akka.protobuf.GeneratedMessage.Builder> builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private ArteryHeartbeatRsp(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final ArteryHeartbeatRsp defaultInstance; + public static ArteryHeartbeatRsp getDefaultInstance() { + return defaultInstance; + } + + public ArteryHeartbeatRsp getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private ArteryHeartbeatRsp( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + uid_ = input.readUInt64(); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.remote.ArteryControlFormats.internal_static_ArteryHeartbeatRsp_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.remote.ArteryControlFormats.internal_static_ArteryHeartbeatRsp_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.remote.ArteryControlFormats.ArteryHeartbeatRsp.class, akka.remote.ArteryControlFormats.ArteryHeartbeatRsp.Builder.class); + } + + public static akka.protobuf.Parser
required uint64 uid = 1;
+ */
+ public boolean hasUid() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required uint64 uid = 1;
+ */
+ public long getUid() {
+ return uid_;
+ }
+
+ private void initFields() {
+ uid_ = 0L;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasUid()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(akka.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeUInt64(1, uid_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += akka.protobuf.CodedOutputStream
+ .computeUInt64Size(1, uid_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static akka.remote.ArteryControlFormats.ArteryHeartbeatRsp parseFrom(
+ akka.protobuf.ByteString data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.remote.ArteryControlFormats.ArteryHeartbeatRsp parseFrom(
+ akka.protobuf.ByteString data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.remote.ArteryControlFormats.ArteryHeartbeatRsp parseFrom(byte[] data)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data);
+ }
+ public static akka.remote.ArteryControlFormats.ArteryHeartbeatRsp parseFrom(
+ byte[] data,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws akka.protobuf.InvalidProtocolBufferException {
+ return PARSER.parseFrom(data, extensionRegistry);
+ }
+ public static akka.remote.ArteryControlFormats.ArteryHeartbeatRsp parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.remote.ArteryControlFormats.ArteryHeartbeatRsp parseFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+ public static akka.remote.ArteryControlFormats.ArteryHeartbeatRsp parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input);
+ }
+ public static akka.remote.ArteryControlFormats.ArteryHeartbeatRsp parseDelimitedFrom(
+ java.io.InputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseDelimitedFrom(input, extensionRegistry);
+ }
+ public static akka.remote.ArteryControlFormats.ArteryHeartbeatRsp parseFrom(
+ akka.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input);
+ }
+ public static akka.remote.ArteryControlFormats.ArteryHeartbeatRsp parseFrom(
+ akka.protobuf.CodedInputStream input,
+ akka.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return PARSER.parseFrom(input, extensionRegistry);
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(akka.remote.ArteryControlFormats.ArteryHeartbeatRsp prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ akka.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ /**
+ * Protobuf type {@code ArteryHeartbeatRsp}
+ *
+ * + * RemoteWatcher.ArteryHeartbeat is empty array + * RemoteWatcher.ArteryHeartbeatRsp + *+ */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder
required uint64 uid = 1;
+ */
+ public boolean hasUid() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ /**
+ * required uint64 uid = 1;
+ */
+ public long getUid() {
+ return uid_;
+ }
+ /**
+ * required uint64 uid = 1;
+ */
+ public Builder setUid(long value) {
+ bitField0_ |= 0x00000001;
+ uid_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * required uint64 uid = 1;
+ */
+ public Builder clearUid() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ uid_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:ArteryHeartbeatRsp)
+ }
+
+ static {
+ defaultInstance = new ArteryHeartbeatRsp(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:ArteryHeartbeatRsp)
+ }
+
private static akka.protobuf.Descriptors.Descriptor
internal_static_Quarantined_descriptor;
private static
@@ -6866,6 +7280,11 @@ public final class ArteryControlFormats {
private static
akka.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_UniqueAddress_fieldAccessorTable;
+ private static akka.protobuf.Descriptors.Descriptor
+ internal_static_ArteryHeartbeatRsp_descriptor;
+ private static
+ akka.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_ArteryHeartbeatRsp_fieldAccessorTable;
public static akka.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@@ -6894,8 +7313,9 @@ public final class ArteryControlFormats {
"2\016.UniqueAddress\"K\n\007Address\022\020\n\010protocol\030" +
"\001 \002(\t\022\016\n\006system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022" +
"\014\n\004port\030\004 \002(\r\"7\n\rUniqueAddress\022\031\n\007addres" +
- "s\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002 \002(\004B\017\n\013akka.r",
- "emoteH\001"
+ "s\030\001 \002(\0132\010.Address\022\013\n\003uid\030\002 \002(\004\"!\n\022Artery",
+ "HeartbeatRsp\022\013\n\003uid\030\001 \002(\004B\017\n\013akka.remote" +
+ "H\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6956,6 +7376,12 @@ public final class ArteryControlFormats {
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_UniqueAddress_descriptor,
new java.lang.String[] { "Address", "Uid", });
+ internal_static_ArteryHeartbeatRsp_descriptor =
+ getDescriptor().getMessageTypes().get(9);
+ internal_static_ArteryHeartbeatRsp_fieldAccessorTable = new
+ akka.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_ArteryHeartbeatRsp_descriptor,
+ new java.lang.String[] { "Uid", });
return null;
}
};
diff --git a/akka-remote/src/main/protobuf/ArteryControlFormats.proto b/akka-remote/src/main/protobuf/ArteryControlFormats.proto
index d71d66fe63..8970092b9f 100644
--- a/akka-remote/src/main/protobuf/ArteryControlFormats.proto
+++ b/akka-remote/src/main/protobuf/ArteryControlFormats.proto
@@ -78,3 +78,10 @@ message UniqueAddress {
required Address address = 1;
required uint64 uid = 2;
}
+
+
+// RemoteWatcher.ArteryHeartbeat is empty array
+// RemoteWatcher.ArteryHeartbeatRsp
+message ArteryHeartbeatRsp {
+ required uint64 uid = 1;
+}
\ No newline at end of file
diff --git a/akka-remote/src/main/scala/akka/remote/AddressUidExtension.scala b/akka-remote/src/main/scala/akka/remote/AddressUidExtension.scala
index e83d6f7f42..ddec6fa52a 100644
--- a/akka-remote/src/main/scala/akka/remote/AddressUidExtension.scala
+++ b/akka-remote/src/main/scala/akka/remote/AddressUidExtension.scala
@@ -11,7 +11,9 @@ import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
/**
- * Extension that holds a uid that is assigned as a random `Int`.
+ * Extension that holds a uid that is assigned as a random `Long` or `Int` depending
+ * on which version of remoting that is used.
+ *
* The uid is intended to be used together with an [[akka.actor.Address]]
* to be able to distinguish restarted actor system using the same host
* and port.
@@ -22,15 +24,26 @@ object AddressUidExtension extends ExtensionId[AddressUidExtension] with Extensi
override def lookup = AddressUidExtension
override def createExtension(system: ExtendedActorSystem): AddressUidExtension = new AddressUidExtension(system)
+
}
class AddressUidExtension(val system: ExtendedActorSystem) extends Extension {
+
+ private def arteryEnabled = system.provider.asInstanceOf[RemoteActorRefProvider].remoteSettings.Artery.Enabled
+
val longAddressUid: Long = {
- // FIXME we should use a long here, but then we need to change in Cluster and RemoteWatcher also
- //ThreadLocalRandom.current.nextLong()
- ThreadLocalRandom.current.nextInt()
+ val tlr = ThreadLocalRandom.current
+ if (arteryEnabled) tlr.nextLong()
+ // with the old remoting we need to make toInt.toLong return the same number
+ // to keep wire compatibility
+ else tlr.nextInt().toLong
}
+ // used by old remoting and part of public api
@deprecated("Use longAddressUid instead", "2.4.x")
- val addressUid: Int = longAddressUid.toInt
+ lazy val addressUid: Int = {
+ if (arteryEnabled) {
+ throw new IllegalStateException("Int UID must never be used with Artery")
+ } else longAddressUid.toInt
+ }
}
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
index 28ced7a91b..187f9f1389 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
@@ -455,7 +455,7 @@ private[akka] class RemoteActorRefProvider(
* @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but
* the current endpoint writer will be stopped (dropping system messages) and the address will be gated
*/
- def quarantine(address: Address, uid: Option[Int], reason: String): Unit =
+ def quarantine(address: Address, uid: Option[Long], reason: String): Unit =
transport.quarantine(address, uid, reason)
}
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala
index c8abdabcba..c3cc61cf5d 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala
@@ -90,6 +90,6 @@ private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, va
* @param uid UID of the remote system, if the uid is not defined it will not be a strong quarantine but
* the current endpoint writer will be stopped (dropping system messages) and the address will be gated
*/
- def quarantine(address: Address, uid: Option[Int], reason: String): Unit
+ def quarantine(address: Address, uid: Option[Long], reason: String): Unit
}
diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala
index 33c8730f40..6df02b24f1 100644
--- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala
@@ -8,6 +8,7 @@ import akka.actor._
import akka.dispatch.sysmsg.{ DeathWatchNotification, Watch }
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.event.AddressTerminatedTopic
+import akka.remote.artery.ArteryMessage
import scala.collection.mutable
import scala.concurrent.duration._
@@ -34,6 +35,10 @@ private[akka] object RemoteWatcher {
@SerialVersionUID(1L) case object Heartbeat extends HeartbeatMessage
@SerialVersionUID(1L) final case class HeartbeatRsp(addressUid: Int) extends HeartbeatMessage
+ // specific pair of messages for artery to allow for protobuf serialization and long uid
+ case object ArteryHeartbeat extends HeartbeatMessage with ArteryMessage
+ final case class ArteryHeartbeatRsp(uid: Long) extends HeartbeatMessage with ArteryMessage
+
// sent to self only
case object HeartbeatTick
case object ReapUnreachableTick
@@ -89,13 +94,12 @@ private[akka] class RemoteWatcher(
import context.dispatcher
def scheduler = context.system.scheduler
- val remoteProvider: RemoteActorRefProvider = context.system.asInstanceOf[ExtendedActorSystem].provider match {
- case rarp: RemoteActorRefProvider ⇒ rarp
- case other ⇒ throw new ConfigurationException(
- s"ActorSystem [${context.system}] needs to have a 'RemoteActorRefProvider' enabled in the configuration, currently uses [${other.getClass.getName}]")
- }
+ val remoteProvider: RemoteActorRefProvider = RARP(context.system).provider
+ val artery = remoteProvider.remoteSettings.Artery.Enabled
- val selfHeartbeatRspMsg = HeartbeatRsp(AddressUidExtension(context.system).addressUid)
+ val (heartBeatMsg, selfHeartbeatRspMsg) =
+ if (artery) (ArteryHeartbeat, ArteryHeartbeatRsp(AddressUidExtension(context.system).longAddressUid))
+ else (Heartbeat, HeartbeatRsp(AddressUidExtension(context.system).addressUid))
// actors that this node is watching, map of watchee -> Set(watchers)
val watching = new mutable.HashMap[InternalActorRef, mutable.Set[InternalActorRef]]() with mutable.MultiMap[InternalActorRef, InternalActorRef]
@@ -105,7 +109,7 @@ private[akka] class RemoteWatcher(
def watchingNodes = watcheeByNodes.keySet
var unreachable: Set[Address] = Set.empty
- var addressUids: Map[Address, Int] = Map.empty
+ var addressUids: Map[Address, Long] = Map.empty
val heartbeatTask = scheduler.schedule(heartbeatInterval, heartbeatInterval, self, HeartbeatTick)
val failureDetectorReaperTask = scheduler.schedule(unreachableReaperInterval, unreachableReaperInterval,
@@ -119,8 +123,9 @@ private[akka] class RemoteWatcher(
def receive = {
case HeartbeatTick ⇒ sendHeartbeat()
- case Heartbeat ⇒ receiveHeartbeat()
- case HeartbeatRsp(uid) ⇒ receiveHeartbeatRsp(uid)
+ case Heartbeat | ArteryHeartbeat ⇒ receiveHeartbeat()
+ case HeartbeatRsp(uid) ⇒ receiveHeartbeatRsp(uid.toLong)
+ case ArteryHeartbeatRsp(uid) ⇒ receiveHeartbeatRsp(uid)
case ReapUnreachableTick ⇒ reapUnreachable()
case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from)
case WatchRemote(watchee, watcher) ⇒ addWatch(watchee, watcher)
@@ -138,7 +143,7 @@ private[akka] class RemoteWatcher(
def receiveHeartbeat(): Unit =
sender() ! selfHeartbeatRspMsg
- def receiveHeartbeatRsp(uid: Int): Unit = {
+ def receiveHeartbeatRsp(uid: Long): Unit = {
val from = sender().path.address
if (failureDetector.isMonitoring(from))
@@ -167,7 +172,7 @@ private[akka] class RemoteWatcher(
def publishAddressTerminated(address: Address): Unit =
AddressTerminatedTopic(context.system).publish(AddressTerminated(address))
- def quarantine(address: Address, uid: Option[Int], reason: String): Unit =
+ def quarantine(address: Address, uid: Option[Long], reason: String): Unit =
remoteProvider.quarantine(address, uid, reason)
def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = {
@@ -256,7 +261,7 @@ private[akka] class RemoteWatcher(
// other side a chance to reply, and also trigger some resends if needed
scheduler.scheduleOnce(heartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(a))
}
- context.actorSelection(RootActorPath(a) / self.path.elements) ! Heartbeat
+ context.actorSelection(RootActorPath(a) / self.path.elements) ! heartBeatMsg
}
}
diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala
index 415925a42f..127c04ac9b 100644
--- a/akka-remote/src/main/scala/akka/remote/Remoting.scala
+++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala
@@ -230,10 +230,11 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
case None ⇒ throw new RemoteTransportExceptionNoStackTrace("Attempted to send management command but Remoting is not running.", null)
}
- override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = endpointManager match {
- case Some(manager) ⇒ manager ! Quarantine(remoteAddress, uid)
+ override def quarantine(remoteAddress: Address, uid: Option[Long], reason: String): Unit = endpointManager match {
+ case Some(manager) ⇒
+ manager ! Quarantine(remoteAddress, uid.map(_.toInt))
case _ ⇒ throw new RemoteTransportExceptionNoStackTrace(
- s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null)
+ s"Attempted to quarantine address [$remoteAddress] with UID [$uid] but Remoting is not running", null)
}
private[akka] def boundAddresses: Map[String, Set[Address]] = {
diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala
index 1f8350640d..c57a576078 100644
--- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala
+++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala
@@ -4,9 +4,11 @@
package akka.remote
import akka.event.Logging.LogLevel
-import akka.event.{ LoggingAdapter, Logging }
+import akka.event.{ Logging, LoggingAdapter }
import akka.actor.{ ActorSystem, Address }
+import scala.runtime.AbstractFunction2
+
@SerialVersionUID(1L)
sealed trait RemotingLifecycleEvent extends Serializable {
def logLevel: Logging.LogLevel
@@ -79,13 +81,32 @@ final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleE
override def toString: String = s"Remoting error: [${cause.getMessage}] [${Logging.stackTraceFor(cause)}]"
}
+// For binary compatibility
+object QuarantinedEvent extends AbstractFunction2[Address, Int, QuarantinedEvent] {
+
+ @deprecated("Use long uid apply")
+ def apply(address: Address, uid: Int) = new QuarantinedEvent(address, uid)
+}
+
@SerialVersionUID(1L)
-final case class QuarantinedEvent(address: Address, uid: Int) extends RemotingLifecycleEvent {
+final case class QuarantinedEvent(address: Address, longUid: Long) extends RemotingLifecycleEvent {
+
override def logLevel: Logging.LogLevel = Logging.WarningLevel
override val toString: String =
- s"Association to [$address] having UID [$uid] is irrecoverably failed. UID is now quarantined and all " +
+ s"Association to [$address] having UID [$longUid] is irrecoverably failed. UID is now quarantined and all " +
"messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " +
"from this situation."
+
+ // For binary compatibility
+
+ @deprecated("Use long uid constructor")
+ def this(address: Address, uid: Int) = this(address, uid.toLong)
+
+ @deprecated("Use long uid")
+ def uid: Int = longUid.toInt
+
+ @deprecated("Use long uid copy method")
+ def copy(address: Address = address, uid: Int = uid) = new QuarantinedEvent(address, uid)
}
@SerialVersionUID(1L)
diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala
index 0d50d89305..b1da8af71b 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala
@@ -431,7 +431,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
runInboundStreams()
topLevelFREvents.loFreq(Transport_StartupFinished, NoMetaData)
- log.info("Remoting started; listening on address: [{}] with uid [{}]", localAddress.address, localAddress.uid)
+ log.info("Remoting started; listening on address: [{}] with UID [{}]", localAddress.address, localAddress.uid)
}
private lazy val shutdownHook = new Thread {
@@ -891,10 +891,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
}
- override def quarantine(remoteAddress: Address, uid: Option[Int], reason: String): Unit = {
+ override def quarantine(remoteAddress: Address, uid: Option[Long], reason: String): Unit = {
try {
- // FIXME use Long uid
- association(remoteAddress).quarantine(reason, uid.map(_.toLong))
+ association(remoteAddress).quarantine(reason, uid)
} catch {
case ShuttingDown ⇒ // silence it
}
diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala
index eaaa875f3f..1da210f872 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala
@@ -434,8 +434,7 @@ private[remote] class Association(
"messages to this UID will be delivered to dead letters. " +
"Remote actorsystem must be restarted to recover from this situation. {}",
remoteAddress, u, reason)
- // FIXME when we complete the switch to Long UID we must use Long here also, issue #20644
- transport.system.eventStream.publish(QuarantinedEvent(remoteAddress, u.toInt))
+ transport.system.eventStream.publish(QuarantinedEvent(remoteAddress, u))
clearOutboundCompression()
clearInboundCompression(u)
// end delivery of system messages to that incarnation after this point
diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala
index 0c21194e43..06fed362fe 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala
@@ -356,7 +356,7 @@ private[remote] final class HeaderBuilderImpl(
"HeaderBuilderImpl(" +
"version:" + version + ", " +
"flags:" + ByteFlag.binaryLeftPad(flags) + ", " +
- "uid:" + uid + ", " +
+ "UID:" + uid + ", " +
"_senderActorRef:" + _senderActorRef + ", " +
"_senderActorRefIdx:" + _senderActorRefIdx + ", " +
"_recipientActorRef:" + _recipientActorRef + ", " +
diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala
index ee2694a3ac..8e31f0ffdf 100644
--- a/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala
+++ b/akka-remote/src/main/scala/akka/remote/serialization/ArteryMessageSerializer.scala
@@ -5,11 +5,12 @@ package akka.remote.serialization
import akka.actor.{ ActorRef, Address, ExtendedActorSystem }
import akka.protobuf.MessageLite
+import akka.remote.RemoteWatcher.ArteryHeartbeatRsp
import akka.remote.artery.OutboundHandshake.{ HandshakeReq, HandshakeRsp }
import akka.remote.artery.compress.CompressionProtocol._
import akka.remote.artery.compress.{ CompressionProtocol, CompressionTable }
import akka.remote.artery.{ ActorSystemTerminating, ActorSystemTerminatingAck, Quarantined, SystemMessageDelivery }
-import akka.remote.{ ArteryControlFormats, MessageSerializer, UniqueAddress, WireFormats }
+import akka.remote._
import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest }
/** INTERNAL API */
@@ -27,6 +28,9 @@ private[akka] object ArteryMessageSerializer {
private val SystemMessageDeliveryAckManifest = "k"
private val SystemMessageDeliveryNackManifest = "l"
+ private val ArteryHeartbeatManifest = "m"
+ private val ArteryHeartbeatRspManifest = "n"
+
private final val DeadLettersRepresentation = ""
}
@@ -41,6 +45,8 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
case _: SystemMessageDelivery.Ack ⇒ SystemMessageDeliveryAckManifest
case _: HandshakeReq ⇒ HandshakeReqManifest
case _: HandshakeRsp ⇒ HandshakeRspManifest
+ case _: RemoteWatcher.ArteryHeartbeat.type ⇒ ArteryHeartbeatManifest
+ case _: RemoteWatcher.ArteryHeartbeatRsp ⇒ ArteryHeartbeatRspManifest
case _: SystemMessageDelivery.Nack ⇒ SystemMessageDeliveryNackManifest
case _: Quarantined ⇒ QuarantinedManifest
case _: ActorSystemTerminating ⇒ ActorSystemTerminatingManifest
@@ -53,20 +59,22 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
- override def toBinary(o: AnyRef): Array[Byte] = (o match { // most frequent ones first
- case env: SystemMessageDelivery.SystemMessageEnvelope ⇒ serializeSystemMessageEnvelope(env)
- case SystemMessageDelivery.Ack(seqNo, from) ⇒ serializeSystemMessageDeliveryAck(seqNo, from)
- case HandshakeReq(from, to) ⇒ serializeHandshakeReq(from, to)
- case HandshakeRsp(from) ⇒ serializeWithAddress(from)
- case SystemMessageDelivery.Nack(seqNo, from) ⇒ serializeSystemMessageDeliveryAck(seqNo, from)
- case q: Quarantined ⇒ serializeQuarantined(q)
- case ActorSystemTerminating(from) ⇒ serializeWithAddress(from)
- case ActorSystemTerminatingAck(from) ⇒ serializeWithAddress(from)
- case adv: ActorRefCompressionAdvertisement ⇒ serializeActorRefCompressionAdvertisement(adv)
- case ActorRefCompressionAdvertisementAck(from, id) ⇒ serializeCompressionTableAdvertisementAck(from, id)
- case adv: ClassManifestCompressionAdvertisement ⇒ serializeCompressionAdvertisement(adv)(identity)
- case ClassManifestCompressionAdvertisementAck(from, id) ⇒ serializeCompressionTableAdvertisementAck(from, id)
- }).toByteArray
+ override def toBinary(o: AnyRef): Array[Byte] = o match { // most frequent ones first
+ case env: SystemMessageDelivery.SystemMessageEnvelope ⇒ serializeSystemMessageEnvelope(env).toByteArray
+ case SystemMessageDelivery.Ack(seqNo, from) ⇒ serializeSystemMessageDeliveryAck(seqNo, from).toByteArray
+ case HandshakeReq(from, to) ⇒ serializeHandshakeReq(from, to).toByteArray
+ case HandshakeRsp(from) ⇒ serializeWithAddress(from).toByteArray
+ case RemoteWatcher.ArteryHeartbeat ⇒ Array.emptyByteArray
+ case RemoteWatcher.ArteryHeartbeatRsp(from) ⇒ serializeArteryHeartbeatRsp(from).toByteArray
+ case SystemMessageDelivery.Nack(seqNo, from) ⇒ serializeSystemMessageDeliveryAck(seqNo, from).toByteArray
+ case q: Quarantined ⇒ serializeQuarantined(q).toByteArray
+ case ActorSystemTerminating(from) ⇒ serializeWithAddress(from).toByteArray
+ case ActorSystemTerminatingAck(from) ⇒ serializeWithAddress(from).toByteArray
+ case adv: ActorRefCompressionAdvertisement ⇒ serializeActorRefCompressionAdvertisement(adv).toByteArray
+ case ActorRefCompressionAdvertisementAck(from, id) ⇒ serializeCompressionTableAdvertisementAck(from, id).toByteArray
+ case adv: ClassManifestCompressionAdvertisement ⇒ serializeCompressionAdvertisement(adv)(identity).toByteArray
+ case ClassManifestCompressionAdvertisementAck(from, id) ⇒ serializeCompressionTableAdvertisementAck(from, id).toByteArray
+ }
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { // most frequent ones first (could be made a HashMap in the future)
case SystemMessageEnvelopeManifest ⇒ deserializeSystemMessageEnvelope(bytes)
@@ -81,6 +89,8 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
case ActorRefCompressionAdvertisementAckManifest ⇒ deserializeCompressionTableAdvertisementAck(bytes, ActorRefCompressionAdvertisementAck)
case ClassManifestCompressionAdvertisementManifest ⇒ deserializeCompressionAdvertisement(bytes, identity, ClassManifestCompressionAdvertisement)
case ClassManifestCompressionAdvertisementAckManifest ⇒ deserializeCompressionTableAdvertisementAck(bytes, ClassManifestCompressionAdvertisementAck)
+ case ArteryHeartbeatManifest ⇒ RemoteWatcher.ArteryHeartbeat
+ case ArteryHeartbeatRspManifest ⇒ deserializeArteryHeartbeatRsp(bytes, ArteryHeartbeatRsp)
case _ ⇒ throw new IllegalArgumentException(s"Manifest '$manifest' not defined for ArteryControlMessageSerializer (serializer id $identifier)")
}
@@ -226,4 +236,12 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
def deserializeAddress(address: ArteryControlFormats.Address): Address =
Address(address.getProtocol, address.getSystem, address.getHostname, address.getPort)
+
+ def serializeArteryHeartbeatRsp(uid: Long): ArteryControlFormats.ArteryHeartbeatRsp =
+ ArteryControlFormats.ArteryHeartbeatRsp.newBuilder().setUid(uid).build()
+
+ def deserializeArteryHeartbeatRsp(bytes: Array[Byte], create: Long ⇒ ArteryHeartbeatRsp): ArteryHeartbeatRsp = {
+ val msg = ArteryControlFormats.ArteryHeartbeatRsp.parseFrom(bytes)
+ create(msg.getUid)
+ }
}
diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala
index 4777778a97..6a209512f8 100644
--- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala
+++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala
@@ -151,7 +151,7 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
ByteString.ByteString1C(AkkaProtocolMessage.newBuilder().setPayload(PByteString.copyFrom(payload.asByteBuffer)).build.toByteArray) //Reuse Byte Array (naughty!)
override def constructAssociate(info: HandshakeInfo): ByteString = {
- val handshakeInfo = AkkaHandshakeInfo.newBuilder.setOrigin(serializeAddress(info.origin)).setUid(info.uid)
+ val handshakeInfo = AkkaHandshakeInfo.newBuilder.setOrigin(serializeAddress(info.origin)).setUid(info.uid.toLong)
info.cookie foreach handshakeInfo.setCookie
constructControlMessagePdu(WireFormats.CommandType.ASSOCIATE, Some(handshakeInfo))
}
diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala
index 56ff462e8e..6be790e272 100644
--- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala
@@ -37,7 +37,7 @@ object RemoteWatcherSpec {
object TestRemoteWatcher {
final case class AddressTerm(address: Address)
- final case class Quarantined(address: Address, uid: Option[Int])
+ final case class Quarantined(address: Address, uid: Option[Long])
}
class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(
@@ -53,7 +53,7 @@ object RemoteWatcherSpec {
// that doesn't interfere with the real watch that is going on in the background
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
- override def quarantine(address: Address, uid: Option[Int], reason: String): Unit = {
+ override def quarantine(address: Address, uid: Option[Long], reason: String): Unit = {
// don't quarantine in remoting, but publish a testable message
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
}
diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala
index 701cc5c670..252a76299e 100644
--- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala
@@ -773,7 +773,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
inboundHandleProbe.expectNoMsg(1.second)
// Quarantine the connection
- RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID), "test")
+ RARP(thisSystem).provider.quarantine(remoteAddress, Some(remoteUID.toLong), "test")
// Even though the connection is stashed it will be disassociated
inboundHandleProbe.expectMsgType[AssociationHandle.Disassociated]
diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala
index 97a8e22872..dc236b272f 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala
@@ -38,7 +38,7 @@ object RemoteWatcherSpec {
object TestRemoteWatcher {
final case class AddressTerm(address: Address) extends JavaSerializable
- final case class Quarantined(address: Address, uid: Option[Int]) extends JavaSerializable
+ final case class Quarantined(address: Address, uid: Option[Long]) extends JavaSerializable
}
class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(
@@ -54,7 +54,7 @@ object RemoteWatcherSpec {
// that doesn't interfere with the real watch that is going on in the background
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
- override def quarantine(address: Address, uid: Option[Int], reason: String): Unit = {
+ override def quarantine(address: Address, uid: Option[Long], reason: String): Unit = {
// don't quarantine in remoting, but publish a testable message
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
}
@@ -80,7 +80,7 @@ class RemoteWatcherSpec extends AkkaSpec(
val remoteSystem = ActorSystem("RemoteSystem", system.settings.config)
val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress
- def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid
+ def remoteAddressUid = AddressUidExtension(remoteSystem).longAddressUid
Seq(system, remoteSystem).foreach(muteDeadLetters(
akka.remote.transport.AssociationHandle.Disassociated.getClass,
@@ -90,7 +90,7 @@ class RemoteWatcherSpec extends AkkaSpec(
shutdown(remoteSystem)
}
- val heartbeatRspB = HeartbeatRsp(remoteAddressUid)
+ val heartbeatRspB = ArteryHeartbeatRsp(remoteAddressUid)
def createRemoteActor(props: Props, name: String): InternalActorRef = {
remoteSystem.actorOf(props, name)
@@ -119,14 +119,14 @@ class RemoteWatcherSpec extends AkkaSpec(
expectMsg(Stats.counts(watching = 3, watchingNodes = 1))
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
expectNoMsg(100 millis)
monitorA.tell(heartbeatRspB, monitorB)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
expectNoMsg(100 millis)
monitorA ! UnwatchRemote(b1, a1)
@@ -135,7 +135,7 @@ class RemoteWatcherSpec extends AkkaSpec(
expectMsg(Stats.counts(watching = 2, watchingNodes = 1))
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
expectNoMsg(100 millis)
monitorA ! UnwatchRemote(b2, a2)
@@ -144,7 +144,7 @@ class RemoteWatcherSpec extends AkkaSpec(
expectMsg(Stats.counts(watching = 1, watchingNodes = 1))
expectNoMsg(100 millis)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
expectNoMsg(100 millis)
monitorA ! UnwatchRemote(b2, a1)
@@ -176,17 +176,17 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! WatchRemote(b, a)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
monitorA.tell(heartbeatRspB, monitorB)
expectNoMsg(1 second)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
monitorA.tell(heartbeatRspB, monitorB)
within(10 seconds) {
awaitAssert {
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
@@ -215,13 +215,13 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! WatchRemote(b, a)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
// no HeartbeatRsp sent
within(20 seconds) {
awaitAssert {
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
@@ -249,17 +249,17 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! WatchRemote(b, a)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
monitorA.tell(heartbeatRspB, monitorB)
expectNoMsg(1 second)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
monitorA.tell(heartbeatRspB, monitorB)
within(10 seconds) {
awaitAssert {
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
@@ -281,21 +281,21 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! WatchRemote(c, a)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
monitorA.tell(heartbeatRspB, monitorB)
expectNoMsg(1 second)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
monitorA.tell(heartbeatRspB, monitorB)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
monitorA ! ReapUnreachableTick
p.expectNoMsg(1 second)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
monitorA.tell(heartbeatRspB, monitorB)
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
monitorA ! ReapUnreachableTick
p.expectNoMsg(1 second)
q.expectNoMsg(1 second)
@@ -304,7 +304,7 @@ class RemoteWatcherSpec extends AkkaSpec(
within(10 seconds) {
awaitAssert {
monitorA ! HeartbeatTick
- expectMsg(Heartbeat)
+ expectMsg(ArteryHeartbeat)
// but no HeartbeatRsp
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address))
diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala
index cc7ed61608..a702272148 100644
--- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala
@@ -52,11 +52,11 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
val addressA = UniqueAddress(
RARP(system).provider.getDefaultAddress,
- AddressUidExtension(system).addressUid)
+ AddressUidExtension(system).longAddressUid)
val systemB = ActorSystem("systemB", system.settings.config)
val addressB = UniqueAddress(
RARP(systemB).provider.getDefaultAddress,
- AddressUidExtension(systemB).addressUid)
+ AddressUidExtension(systemB).longAddressUid)
val rootB = RootActorPath(addressB.address)
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings)(system)
diff --git a/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala b/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala
index bdf5333632..cf7d409c47 100644
--- a/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala
+++ b/akka-remote/src/test/scala/akka/remote/serialization/ArteryMessageSerializerSpec.scala
@@ -5,7 +5,7 @@
package akka.remote.serialization
import akka.actor._
-import akka.remote.UniqueAddress
+import akka.remote.{ RemoteWatcher, UniqueAddress }
import akka.remote.artery.OutboundHandshake.{ HandshakeReq, HandshakeRsp }
import akka.remote.artery.compress.CompressionProtocol.{ ActorRefCompressionAdvertisement, ActorRefCompressionAdvertisementAck, ClassManifestCompressionAdvertisement, ClassManifestCompressionAdvertisementAck }
import akka.remote.artery.compress.CompressionTable
@@ -30,7 +30,10 @@ class ArteryMessageSerializerSpec extends AkkaSpec {
"ClassManifestCompressionAdvertisementAck" → ClassManifestCompressionAdvertisementAck(uniqueAddress(), 23),
"SystemMessageDelivery.SystemMessageEnvelop" → SystemMessageDelivery.SystemMessageEnvelope("test", 1234567890123L, uniqueAddress()),
"SystemMessageDelivery.Ack" → SystemMessageDelivery.Ack(98765432109876L, uniqueAddress()),
- "SystemMessageDelivery.Nack" → SystemMessageDelivery.Nack(98765432109876L, uniqueAddress())).foreach {
+ "SystemMessageDelivery.Nack" → SystemMessageDelivery.Nack(98765432109876L, uniqueAddress()),
+ "RemoteWatcher.ArteryHeartbeat" → RemoteWatcher.ArteryHeartbeat,
+ "RemoteWatcher.ArteryHeartbeatRsp" → RemoteWatcher.ArteryHeartbeatRsp(Long.MaxValue)
+ ).foreach {
case (scenario, item) ⇒
s"resolve serializer for $scenario" in {
val serializer = SerializationExtension(system)
diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala
index 220bdbdd06..0b4bcfad89 100644
--- a/project/AkkaBuild.scala
+++ b/project/AkkaBuild.scala
@@ -168,19 +168,19 @@ object AkkaBuild extends Build {
id = "akka-cluster",
base = file("akka-cluster"),
dependencies = Seq(remote, remoteTests % "test->test" , testkit % "test->test")
- ).configs(MultiJvm).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).configs(MultiJvm).disablePlugins(ValidatePullRequest)
lazy val clusterMetrics = Project(
id = "akka-cluster-metrics",
base = file("akka-cluster-metrics"),
dependencies = Seq(cluster % "compile->compile;test->test;multi-jvm->multi-jvm", slf4j % "test->compile")
- ).configs(MultiJvm).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).configs(MultiJvm).disablePlugins(ValidatePullRequest)
lazy val clusterTools = Project(
id = "akka-cluster-tools",
base = file("akka-cluster-tools"),
dependencies = Seq(cluster % "compile->compile;test->test;multi-jvm->multi-jvm")
- ).configs(MultiJvm).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).configs(MultiJvm).disablePlugins(ValidatePullRequest)
lazy val clusterSharding = Project(
id = "akka-cluster-sharding",
@@ -191,13 +191,13 @@ object AkkaBuild extends Build {
// provided.
dependencies = Seq(cluster % "compile->compile;test->test;multi-jvm->multi-jvm",
persistence % "compile;test->provided", distributedData % "provided;test", clusterTools)
- ).configs(MultiJvm).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).configs(MultiJvm).disablePlugins(ValidatePullRequest)
lazy val distributedData = Project(
id = "akka-distributed-data-experimental",
base = file("akka-distributed-data"),
dependencies = Seq(cluster % "compile->compile;test->test;multi-jvm->multi-jvm")
- ).configs(MultiJvm).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).configs(MultiJvm).disablePlugins(ValidatePullRequest)
lazy val slf4j = Project(
id = "akka-slf4j",
@@ -215,7 +215,7 @@ object AkkaBuild extends Build {
id = "akka-persistence",
base = file("akka-persistence"),
dependencies = Seq(actor, testkit % "test->test", protobuf)
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val persistenceQuery = Project(
id = "akka-persistence-query-experimental",
@@ -225,37 +225,37 @@ object AkkaBuild extends Build {
persistence % "compile;provided->provided;test->test",
testkit % "compile;test->test",
streamTestkit % "compile;test->test")
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val persistenceTck = Project(
id = "akka-persistence-tck",
base = file("akka-persistence-tck"),
dependencies = Seq(persistence % "compile;provided->provided;test->test", testkit % "compile;test->test")
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val persistenceShared = Project(
id = "akka-persistence-shared",
base = file("akka-persistence-shared"),
dependencies = Seq(persistence % "test->test", testkit % "test->test", remote % "test", protobuf)
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val httpCore = Project(
id = "akka-http-core",
base = file("akka-http-core"),
dependencies = Seq(stream, parsing, streamTestkit % "test->test")
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val http = Project(
id = "akka-http-experimental",
base = file("akka-http"),
dependencies = Seq(httpCore)
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val httpTestkit = Project(
id = "akka-http-testkit",
base = file("akka-http-testkit"),
dependencies = Seq(http, streamTestkit)
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val httpTests = Project(
id = "akka-http-tests",
@@ -263,12 +263,12 @@ object AkkaBuild extends Build {
dependencies = Seq(
httpTestkit % "test", streamTestkit % "test->test", testkit % "test->test", httpSprayJson, httpXml, httpJackson,
multiNodeTestkit, remoteTests % "test->test") // required for multi-node latency/throughput Spec
- ).configs(MultiJvm).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).configs(MultiJvm).disablePlugins(ValidatePullRequest)
lazy val httpMarshallersScala = Project(
id = "akka-http-marshallers-scala-experimental",
base = file("akka-http-marshallers-scala")
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
.settings(parentSettings: _*)
.aggregate(httpSprayJson, httpXml)
@@ -281,7 +281,7 @@ object AkkaBuild extends Build {
lazy val httpMarshallersJava = Project(
id = "akka-http-marshallers-java-experimental",
base = file("akka-http-marshallers-java")
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
.settings(parentSettings: _*)
.aggregate(httpJackson)
@@ -293,61 +293,61 @@ object AkkaBuild extends Build {
id = s"akka-http-$name-experimental",
base = file(s"akka-http-marshallers-scala/akka-http-$name"),
dependencies = Seq(http)
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
def httpMarshallersJavaSubproject(name: String) =
Project(
id = s"akka-http-$name-experimental",
base = file(s"akka-http-marshallers-java/akka-http-$name"),
dependencies = Seq(http)
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val parsing = Project(
id = "akka-parsing",
base = file("akka-parsing")
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val stream = Project(
id = "akka-stream",
base = file("akka-stream"),
dependencies = Seq(actor)
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val streamTestkit = Project(
id = "akka-stream-testkit",
base = file("akka-stream-testkit"),
dependencies = Seq(stream, testkit % "compile;test->test")
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val streamTests = Project(
id = "akka-stream-tests",
base = file("akka-stream-tests"),
dependencies = Seq(streamTestkit % "test->test", stream)
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val streamTestsTck = Project(
id = "akka-stream-tests-tck",
base = file("akka-stream-tests-tck"),
dependencies = Seq(streamTestkit % "test->test", stream)
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val kernel = Project(
id = "akka-kernel",
base = file("akka-kernel"),
dependencies = Seq(actor, testkit % "test->test")
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val camel = Project(
id = "akka-camel",
base = file("akka-camel"),
dependencies = Seq(actor, slf4j, testkit % "test->test")
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val osgi = Project(
id = "akka-osgi",
base = file("akka-osgi"),
dependencies = Seq(actor)
- ).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).disablePlugins(ValidatePullRequest)
lazy val docs = Project(
id = "akka-docs",
@@ -368,7 +368,7 @@ object AkkaBuild extends Build {
id = "akka-contrib",
base = file("akka-contrib"),
dependencies = Seq(remote, remoteTests % "test->test", cluster, clusterTools, persistence % "compile;test->provided")
- ).configs(MultiJvm).disablePlugins(ValidatePullRequest, MimaPlugin)
+ ).configs(MultiJvm).disablePlugins(ValidatePullRequest)
lazy val samplesSettings = parentSettings ++ ActivatorDist.settings
diff --git a/project/MiMa.scala b/project/MiMa.scala
index 3d896c3d31..59c076e4c3 100644
--- a/project/MiMa.scala
+++ b/project/MiMa.scala
@@ -981,7 +981,13 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.TextMessage.asScala"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.TextMessage.getStreamedText"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.BinaryMessage.asScala"),
- ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.BinaryMessage.getStreamedData")
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.BinaryMessage.getStreamedData"),
+
+ // #20644 long uids
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#UniqueAddressOrBuilder.hasUid2"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#UniqueAddressOrBuilder.getUid2"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.RemoteWatcher.receiveHeartbeatRsp"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.RemoteWatcher.selfHeartbeatRspMsg")
)
)
}