Harden multi-dc joining, #29280 (#29346)

* Harden multi-dc joining, #29280

* failing test MultiDcJoinSpec
* require that all have seen the gossip seen for the first member in other DC
* the test also revealed that gossip wasn't propagated between DCs when
  the VectorClock was the same and only seen is different
* add a SHA-1 disgest of the seen in the GossipStatus to detect that they
  are different and that full gossip should be exchanged

* comments

* another test

* mima version
This commit is contained in:
Patrik Nordwall 2020-08-07 18:02:31 +02:00 committed by GitHub
parent faada69ab4
commit 686729c75b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 489 additions and 55 deletions

View file

@ -7058,6 +7058,17 @@ public final class ClusterMessages {
* <code>required .VectorClock version = 3;</code> * <code>required .VectorClock version = 3;</code>
*/ */
akka.cluster.protobuf.msg.ClusterMessages.VectorClockOrBuilder getVersionOrBuilder(); akka.cluster.protobuf.msg.ClusterMessages.VectorClockOrBuilder getVersionOrBuilder();
/**
* <code>optional bytes seenDigest = 4;</code>
* @return Whether the seenDigest field is set.
*/
boolean hasSeenDigest();
/**
* <code>optional bytes seenDigest = 4;</code>
* @return The seenDigest.
*/
akka.protobufv3.internal.ByteString getSeenDigest();
} }
/** /**
* <pre> * <pre>
@ -7078,6 +7089,7 @@ public final class ClusterMessages {
} }
private GossipStatus() { private GossipStatus() {
allHashes_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY; allHashes_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY;
seenDigest_ = akka.protobufv3.internal.ByteString.EMPTY;
} }
@java.lang.Override @java.lang.Override
@ -7146,6 +7158,11 @@ public final class ClusterMessages {
bitField0_ |= 0x00000002; bitField0_ |= 0x00000002;
break; break;
} }
case 34: {
bitField0_ |= 0x00000004;
seenDigest_ = input.readBytes();
break;
}
default: { default: {
if (!parseUnknownField( if (!parseUnknownField(
input, unknownFields, extensionRegistry, tag)) { input, unknownFields, extensionRegistry, tag)) {
@ -7263,6 +7280,23 @@ public final class ClusterMessages {
return version_ == null ? akka.cluster.protobuf.msg.ClusterMessages.VectorClock.getDefaultInstance() : version_; return version_ == null ? akka.cluster.protobuf.msg.ClusterMessages.VectorClock.getDefaultInstance() : version_;
} }
public static final int SEENDIGEST_FIELD_NUMBER = 4;
private akka.protobufv3.internal.ByteString seenDigest_;
/**
* <code>optional bytes seenDigest = 4;</code>
* @return Whether the seenDigest field is set.
*/
public boolean hasSeenDigest() {
return ((bitField0_ & 0x00000004) != 0);
}
/**
* <code>optional bytes seenDigest = 4;</code>
* @return The seenDigest.
*/
public akka.protobufv3.internal.ByteString getSeenDigest() {
return seenDigest_;
}
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
@java.lang.Override @java.lang.Override
public final boolean isInitialized() { public final boolean isInitialized() {
@ -7302,6 +7336,9 @@ public final class ClusterMessages {
if (((bitField0_ & 0x00000002) != 0)) { if (((bitField0_ & 0x00000002) != 0)) {
output.writeMessage(3, getVersion()); output.writeMessage(3, getVersion());
} }
if (((bitField0_ & 0x00000004) != 0)) {
output.writeBytes(4, seenDigest_);
}
unknownFields.writeTo(output); unknownFields.writeTo(output);
} }
@ -7327,6 +7364,10 @@ public final class ClusterMessages {
size += akka.protobufv3.internal.CodedOutputStream size += akka.protobufv3.internal.CodedOutputStream
.computeMessageSize(3, getVersion()); .computeMessageSize(3, getVersion());
} }
if (((bitField0_ & 0x00000004) != 0)) {
size += akka.protobufv3.internal.CodedOutputStream
.computeBytesSize(4, seenDigest_);
}
size += unknownFields.getSerializedSize(); size += unknownFields.getSerializedSize();
memoizedSize = size; memoizedSize = size;
return size; return size;
@ -7354,6 +7395,11 @@ public final class ClusterMessages {
if (!getVersion() if (!getVersion()
.equals(other.getVersion())) return false; .equals(other.getVersion())) return false;
} }
if (hasSeenDigest() != other.hasSeenDigest()) return false;
if (hasSeenDigest()) {
if (!getSeenDigest()
.equals(other.getSeenDigest())) return false;
}
if (!unknownFields.equals(other.unknownFields)) return false; if (!unknownFields.equals(other.unknownFields)) return false;
return true; return true;
} }
@ -7377,6 +7423,10 @@ public final class ClusterMessages {
hash = (37 * hash) + VERSION_FIELD_NUMBER; hash = (37 * hash) + VERSION_FIELD_NUMBER;
hash = (53 * hash) + getVersion().hashCode(); hash = (53 * hash) + getVersion().hashCode();
} }
if (hasSeenDigest()) {
hash = (37 * hash) + SEENDIGEST_FIELD_NUMBER;
hash = (53 * hash) + getSeenDigest().hashCode();
}
hash = (29 * hash) + unknownFields.hashCode(); hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -7531,6 +7581,8 @@ public final class ClusterMessages {
versionBuilder_.clear(); versionBuilder_.clear();
} }
bitField0_ = (bitField0_ & ~0x00000004); bitField0_ = (bitField0_ & ~0x00000004);
seenDigest_ = akka.protobufv3.internal.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000008);
return this; return this;
} }
@ -7580,6 +7632,10 @@ public final class ClusterMessages {
} }
to_bitField0_ |= 0x00000002; to_bitField0_ |= 0x00000002;
} }
if (((from_bitField0_ & 0x00000008) != 0)) {
to_bitField0_ |= 0x00000004;
}
result.seenDigest_ = seenDigest_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -7645,6 +7701,9 @@ public final class ClusterMessages {
if (other.hasVersion()) { if (other.hasVersion()) {
mergeVersion(other.getVersion()); mergeVersion(other.getVersion());
} }
if (other.hasSeenDigest()) {
setSeenDigest(other.getSeenDigest());
}
this.mergeUnknownFields(other.unknownFields); this.mergeUnknownFields(other.unknownFields);
onChanged(); onChanged();
return this; return this;
@ -8035,6 +8094,46 @@ public final class ClusterMessages {
} }
return versionBuilder_; return versionBuilder_;
} }
private akka.protobufv3.internal.ByteString seenDigest_ = akka.protobufv3.internal.ByteString.EMPTY;
/**
* <code>optional bytes seenDigest = 4;</code>
* @return Whether the seenDigest field is set.
*/
public boolean hasSeenDigest() {
return ((bitField0_ & 0x00000008) != 0);
}
/**
* <code>optional bytes seenDigest = 4;</code>
* @return The seenDigest.
*/
public akka.protobufv3.internal.ByteString getSeenDigest() {
return seenDigest_;
}
/**
* <code>optional bytes seenDigest = 4;</code>
* @param value The seenDigest to set.
* @return This builder for chaining.
*/
public Builder setSeenDigest(akka.protobufv3.internal.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000008;
seenDigest_ = value;
onChanged();
return this;
}
/**
* <code>optional bytes seenDigest = 4;</code>
* @return This builder for chaining.
*/
public Builder clearSeenDigest() {
bitField0_ = (bitField0_ & ~0x00000008);
seenDigest_ = getDefaultInstance().getSeenDigest();
onChanged();
return this;
}
@java.lang.Override @java.lang.Override
public final Builder setUnknownFields( public final Builder setUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) { final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
@ -21819,46 +21918,46 @@ public final class ClusterMessages {
"iqueAddress\022\022\n\nsequenceNr\030\002 \001(\003\022\024\n\014creat" + "iqueAddress\022\022\n\nsequenceNr\030\002 \001(\003\022\024\n\014creat" +
"ionTime\030\003 \001(\003\"d\n\016GossipEnvelope\022\034\n\004from\030" + "ionTime\030\003 \001(\003\"d\n\016GossipEnvelope\022\034\n\004from\030" +
"\001 \002(\0132\016.UniqueAddress\022\032\n\002to\030\002 \002(\0132\016.Uniq" + "\001 \002(\0132\016.UniqueAddress\022\032\n\002to\030\002 \002(\0132\016.Uniq" +
"ueAddress\022\030\n\020serializedGossip\030\003 \002(\014\"^\n\014G" + "ueAddress\022\030\n\020serializedGossip\030\003 \002(\014\"r\n\014G" +
"ossipStatus\022\034\n\004from\030\001 \002(\0132\016.UniqueAddres" + "ossipStatus\022\034\n\004from\030\001 \002(\0132\016.UniqueAddres" +
"s\022\021\n\tallHashes\030\002 \003(\t\022\035\n\007version\030\003 \002(\0132\014." + "s\022\021\n\tallHashes\030\002 \003(\t\022\035\n\007version\030\003 \002(\0132\014." +
"VectorClock\"\317\001\n\006Gossip\022$\n\014allAddresses\030\001" + "VectorClock\022\022\n\nseenDigest\030\004 \001(\014\"\317\001\n\006Goss" +
" \003(\0132\016.UniqueAddress\022\020\n\010allRoles\030\002 \003(\t\022\021" + "ip\022$\n\014allAddresses\030\001 \003(\0132\016.UniqueAddress" +
"\n\tallHashes\030\003 \003(\t\022\030\n\007members\030\004 \003(\0132\007.Mem" + "\022\020\n\010allRoles\030\002 \003(\t\022\021\n\tallHashes\030\003 \003(\t\022\030\n" +
"ber\022!\n\010overview\030\005 \002(\0132\017.GossipOverview\022\035" + "\007members\030\004 \003(\0132\007.Member\022!\n\010overview\030\005 \002(" +
"\n\007version\030\006 \002(\0132\014.VectorClock\022\036\n\ntombsto" + "\0132\017.GossipOverview\022\035\n\007version\030\006 \002(\0132\014.Ve" +
"nes\030\007 \003(\0132\n.Tombstone\"S\n\016GossipOverview\022" + "ctorClock\022\036\n\ntombstones\030\007 \003(\0132\n.Tombston" +
"\014\n\004seen\030\001 \003(\005\0223\n\024observerReachability\030\002 " + "e\"S\n\016GossipOverview\022\014\n\004seen\030\001 \003(\005\0223\n\024obs" +
"\003(\0132\025.ObserverReachability\"p\n\024ObserverRe" + "erverReachability\030\002 \003(\0132\025.ObserverReacha" +
"achability\022\024\n\014addressIndex\030\001 \002(\005\022\017\n\007vers" + "bility\"p\n\024ObserverReachability\022\024\n\014addres" +
"ion\030\004 \002(\003\0221\n\023subjectReachability\030\002 \003(\0132\024" + "sIndex\030\001 \002(\005\022\017\n\007version\030\004 \002(\003\0221\n\023subject" +
".SubjectReachability\"a\n\023SubjectReachabil" + "Reachability\030\002 \003(\0132\024.SubjectReachability" +
"ity\022\024\n\014addressIndex\030\001 \002(\005\022#\n\006status\030\003 \002(" + "\"a\n\023SubjectReachability\022\024\n\014addressIndex\030" +
"\0162\023.ReachabilityStatus\022\017\n\007version\030\004 \002(\003\"" + "\001 \002(\005\022#\n\006status\030\003 \002(\0162\023.ReachabilityStat" +
"4\n\tTombstone\022\024\n\014addressIndex\030\001 \002(\005\022\021\n\tti" + "us\022\017\n\007version\030\004 \002(\003\"4\n\tTombstone\022\024\n\014addr" +
"mestamp\030\002 \002(\003\"i\n\006Member\022\024\n\014addressIndex\030" + "essIndex\030\001 \002(\005\022\021\n\ttimestamp\030\002 \002(\003\"i\n\006Mem" +
"\001 \002(\005\022\020\n\010upNumber\030\002 \002(\005\022\035\n\006status\030\003 \002(\0162" + "ber\022\024\n\014addressIndex\030\001 \002(\005\022\020\n\010upNumber\030\002 " +
"\r.MemberStatus\022\030\n\014rolesIndexes\030\004 \003(\005B\002\020\001" + "\002(\005\022\035\n\006status\030\003 \002(\0162\r.MemberStatus\022\030\n\014ro" +
"\"y\n\013VectorClock\022\021\n\ttimestamp\030\001 \001(\003\022&\n\010ve" + "lesIndexes\030\004 \003(\005B\002\020\001\"y\n\013VectorClock\022\021\n\tt" +
"rsions\030\002 \003(\0132\024.VectorClock.Version\032/\n\007Ve" + "imestamp\030\001 \001(\003\022&\n\010versions\030\002 \003(\0132\024.Vecto" +
"rsion\022\021\n\thashIndex\030\001 \002(\005\022\021\n\ttimestamp\030\002 " + "rClock.Version\032/\n\007Version\022\021\n\thashIndex\030\001" +
"\002(\003\"\007\n\005Empty\"K\n\007Address\022\016\n\006system\030\001 \002(\t\022" + " \002(\005\022\021\n\ttimestamp\030\002 \002(\003\"\007\n\005Empty\"K\n\007Addr" +
"\020\n\010hostname\030\002 \002(\t\022\014\n\004port\030\003 \002(\r\022\020\n\010proto" + "ess\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n" +
"col\030\004 \001(\t\"E\n\rUniqueAddress\022\031\n\007address\030\001 " + "\004port\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"E\n\rUniqueA" +
"\002(\0132\010.Address\022\013\n\003uid\030\002 \002(\r\022\014\n\004uid2\030\003 \001(\r" + "ddress\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid" +
"\"V\n\021ClusterRouterPool\022\023\n\004pool\030\001 \002(\0132\005.Po" + "\030\002 \002(\r\022\014\n\004uid2\030\003 \001(\r\"V\n\021ClusterRouterPoo" +
"ol\022,\n\010settings\030\002 \002(\0132\032.ClusterRouterPool" + "l\022\023\n\004pool\030\001 \002(\0132\005.Pool\022,\n\010settings\030\002 \002(\013" +
"Settings\"<\n\004Pool\022\024\n\014serializerId\030\001 \002(\r\022\020" + "2\032.ClusterRouterPoolSettings\"<\n\004Pool\022\024\n\014" +
"\n\010manifest\030\002 \002(\t\022\014\n\004data\030\003 \002(\014\"\216\001\n\031Clust" + "serializerId\030\001 \002(\r\022\020\n\010manifest\030\002 \002(\t\022\014\n\004" +
"erRouterPoolSettings\022\026\n\016totalInstances\030\001" + "data\030\003 \002(\014\"\216\001\n\031ClusterRouterPoolSettings" +
" \002(\r\022\033\n\023maxInstancesPerNode\030\002 \002(\r\022\031\n\021all" + "\022\026\n\016totalInstances\030\001 \002(\r\022\033\n\023maxInstances" +
"owLocalRoutees\030\003 \002(\010\022\017\n\007useRole\030\004 \001(\t\022\020\n" + "PerNode\030\002 \002(\r\022\031\n\021allowLocalRoutees\030\003 \002(\010" +
"\010useRoles\030\005 \003(\t*D\n\022ReachabilityStatus\022\r\n" + "\022\017\n\007useRole\030\004 \001(\t\022\020\n\010useRoles\030\005 \003(\t*D\n\022R" +
"\tReachable\020\000\022\017\n\013Unreachable\020\001\022\016\n\nTermina" + "eachabilityStatus\022\r\n\tReachable\020\000\022\017\n\013Unre" +
"ted\020\002*b\n\014MemberStatus\022\013\n\007Joining\020\000\022\006\n\002Up" + "achable\020\001\022\016\n\nTerminated\020\002*b\n\014MemberStatu" +
"\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013" + "s\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007E" +
"\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006B\035\n\031akka.cluste" + "xiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005\022\014\n\010Weakl" +
"r.protobuf.msgH\001" "yUp\020\006B\035\n\031akka.cluster.protobuf.msgH\001"
}; };
descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData, .internalBuildGeneratedFileFrom(descriptorData,
@ -21917,7 +22016,7 @@ public final class ClusterMessages {
internal_static_GossipStatus_fieldAccessorTable = new internal_static_GossipStatus_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_GossipStatus_descriptor, internal_static_GossipStatus_descriptor,
new java.lang.String[] { "From", "AllHashes", "Version", }); new java.lang.String[] { "From", "AllHashes", "Version", "SeenDigest", });
internal_static_Gossip_descriptor = internal_static_Gossip_descriptor =
getDescriptor().getMessageTypes().get(9); getDescriptor().getMessageTypes().get(9);
internal_static_Gossip_fieldAccessorTable = new internal_static_Gossip_fieldAccessorTable = new

View file

@ -0,0 +1,3 @@
# #29280
ProblemFilters.exclude[Problem]("akka.cluster.GossipStatus*")
ProblemFilters.exclude[Problem]("akka.cluster.protobuf.msg.ClusterMessages*")

View file

@ -119,6 +119,7 @@ message GossipStatus {
required UniqueAddress from = 1; required UniqueAddress from = 1;
repeated string allHashes = 2; repeated string allHashes = 2;
required VectorClock version = 3; required VectorClock version = 3;
optional bytes seenDigest = 4;
} }
/** /**

View file

@ -970,10 +970,20 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
else if (!latestGossip.isReachable(selfUniqueAddress, from)) else if (!latestGossip.isReachable(selfUniqueAddress, from))
gossipLogger.logInfo("Ignoring received gossip status from unreachable [{}] ", from) gossipLogger.logInfo("Ignoring received gossip status from unreachable [{}] ", from)
else { else {
status.version.compareTo(latestGossip.version) match { val localSeenDigest = latestGossip.seenDigest
case VectorClock.Same => // same version val seenSame =
case VectorClock.After => gossipStatusTo(from, sender()) // remote is newer if (status.seenDigest.isEmpty || localSeenDigest.isEmpty) true
case _ => gossipTo(from, sender()) // conflicting or local is newer else java.util.Arrays.equals(status.seenDigest, localSeenDigest)
if (!seenSame) {
gossipTo(from, sender())
} else {
status.version.compareTo(latestGossip.version) match {
case VectorClock.Same => // same version
case VectorClock.After =>
gossipStatusTo(from, sender()) // remote is newer
case _ =>
gossipTo(from, sender()) // conflicting or local is newer
}
} }
} }
} }
@ -1492,11 +1502,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
def gossipStatusTo(node: UniqueAddress, destination: ActorRef): Unit = def gossipStatusTo(node: UniqueAddress, destination: ActorRef): Unit =
if (membershipState.validNodeForGossip(node)) if (membershipState.validNodeForGossip(node))
destination ! GossipStatus(selfUniqueAddress, latestGossip.version) destination ! GossipStatus(selfUniqueAddress, latestGossip.version, latestGossip.seenDigest)
def gossipStatusTo(node: UniqueAddress): Unit = def gossipStatusTo(node: UniqueAddress): Unit =
if (membershipState.validNodeForGossip(node)) if (membershipState.validNodeForGossip(node))
clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version) clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version, latestGossip.seenDigest)
def updateLatestGossip(gossip: Gossip): Unit = { def updateLatestGossip(gossip: Gossip): Unit = {
// Updating the vclock version for the changes // Updating the vclock version for the changes

View file

@ -4,12 +4,14 @@
package akka.cluster package akka.cluster
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration.Deadline import scala.concurrent.duration.Deadline
import ClusterSettings.DataCenter import ClusterSettings.DataCenter
import MemberStatus._ import MemberStatus._
import akka.annotation.InternalApi import akka.annotation.InternalApi
/** /**
@ -267,6 +269,9 @@ private[cluster] final case class Gossip(
else copy(tombstones = newTombstones) else copy(tombstones = newTombstones)
} }
def seenDigest: Array[Byte] =
overview.seenDigest
override def toString = override def toString =
s"Gossip(members = [${members.mkString(", ")}], overview = $overview, version = $version, tombstones = $tombstones)" s"Gossip(members = [${members.mkString(", ")}], overview = $overview, version = $version, tombstones = $tombstones)"
} }
@ -280,6 +285,11 @@ private[cluster] final case class GossipOverview(
seen: Set[UniqueAddress] = Set.empty, seen: Set[UniqueAddress] = Set.empty,
reachability: Reachability = Reachability.empty) { reachability: Reachability = Reachability.empty) {
lazy val seenDigest: Array[Byte] = {
val bytes = seen.toVector.sorted.map(node => node.address).mkString(",").getBytes(StandardCharsets.UTF_8)
MessageDigest.getInstance("SHA-1").digest(bytes)
}
override def toString = override def toString =
s"GossipOverview(reachability = [$reachability], seen = [${seen.mkString(", ")}])" s"GossipOverview(reachability = [$reachability], seen = [${seen.mkString(", ")}])"
} }
@ -340,4 +350,16 @@ private[cluster] class GossipEnvelope private (
* it replies with its `GossipStatus`. Same versions ends the chat immediately. * it replies with its `GossipStatus`. Same versions ends the chat immediately.
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
private[cluster] final case class GossipStatus(from: UniqueAddress, version: VectorClock) extends ClusterMessage private[cluster] final case class GossipStatus(from: UniqueAddress, version: VectorClock, seenDigest: Array[Byte])
extends ClusterMessage {
override def equals(obj: Any): Boolean = {
obj match {
case other: GossipStatus =>
from == other.from && version == other.version && java.util.Arrays.equals(seenDigest, other.seenDigest)
case _ => false
}
}
override def toString: DataCenter =
f"GossipStatus($from,$version,${seenDigest.map(byte => f"$byte%02x").mkString("")})"
}

View file

@ -55,12 +55,17 @@ import akka.util.ccompat._
*/ */
def convergence(exitingConfirmed: Set[UniqueAddress]): Boolean = { def convergence(exitingConfirmed: Set[UniqueAddress]): Boolean = {
// full convergence needed for first member in a secondary DC
val firstMemberInDc =
!members.exists(member => member.dataCenter == selfDc && convergenceMemberStatus(member.status))
// If another member in the data center that is UP or LEAVING and has not seen this gossip or is exiting // If another member in the data center that is UP or LEAVING and has not seen this gossip or is exiting
// convergence cannot be reached // convergence cannot be reached. For the first member in a secondary DC all members must have seen
// the gossip state.
def memberHinderingConvergenceExists = def memberHinderingConvergenceExists =
members.exists( members.exists(
member => member =>
member.dataCenter == selfDc && (firstMemberInDc || member.dataCenter == selfDc) &&
convergenceMemberStatus(member.status) && convergenceMemberStatus(member.status) &&
!(latestGossip.seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress))) !(latestGossip.seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress)))

View file

@ -510,6 +510,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
.setFrom(uniqueAddressToProto(status.from)) .setFrom(uniqueAddressToProto(status.from))
.addAllAllHashes(allHashes.asJava) .addAllAllHashes(allHashes.asJava)
.setVersion(vectorClockToProto(status.version, hashMapping)) .setVersion(vectorClockToProto(status.version, hashMapping))
.setSeenDigest(ByteString.copyFrom(status.seenDigest))
.build() .build()
} }
@ -594,10 +595,15 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
() => gossipFromProto(cm.Gossip.parseFrom(decompress(serializedGossip.toByteArray)))) () => gossipFromProto(cm.Gossip.parseFrom(decompress(serializedGossip.toByteArray))))
} }
private def gossipStatusFromProto(status: cm.GossipStatus): GossipStatus = private def gossipStatusFromProto(status: cm.GossipStatus): GossipStatus = {
val seenDigest =
if (status.hasSeenDigest) status.getSeenDigest.toByteArray
else Array.emptyByteArray
GossipStatus( GossipStatus(
uniqueAddressFromProto(status.getFrom), uniqueAddressFromProto(status.getFrom),
vectorClockFromProto(status.getVersion, status.getAllHashesList.asScala.toVector)) vectorClockFromProto(status.getVersion, status.getAllHashesList.asScala.toVector),
seenDigest)
}
def deserializeClusterRouterPool(bytes: Array[Byte]): ClusterRouterPool = { def deserializeClusterRouterPool(bytes: Array[Byte]): ClusterRouterPool = {
val crp = cm.ClusterRouterPool.parseFrom(bytes) val crp = cm.ClusterRouterPool.parseFrom(bytes)

View file

@ -0,0 +1,143 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Address
import akka.cluster.ClusterEvent.InitialStateAsEvents
import akka.cluster.ClusterEvent.MemberUp
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
// Similar to MultiDcJoinSpec, but slightly different scenario
object MultiDcJoin2MultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
nodeConfig(first, second, third)(ConfigFactory.parseString("""
akka {
cluster.multi-data-center.self-data-center = alpha
}
"""))
nodeConfig(fourth, fifth)(ConfigFactory.parseString("""
akka {
cluster.multi-data-center.self-data-center = beta
}
"""))
commonConfig(ConfigFactory.parseString("""
akka {
actor.provider = cluster
loggers = ["akka.testkit.TestEventListener"]
loglevel = INFO
remote.log-remote-lifecycle-events = off
cluster {
debug.verbose-heartbeat-logging = off
debug.verbose-gossip-logging = off
multi-data-center {
cross-data-center-connections = 1
}
}
}
"""))
}
class MultiDcJoin2MultiJvmNode1 extends MultiDcJoin2Spec
class MultiDcJoin2MultiJvmNode2 extends MultiDcJoin2Spec
class MultiDcJoin2MultiJvmNode3 extends MultiDcJoin2Spec
class MultiDcJoin2MultiJvmNode4 extends MultiDcJoin2Spec
class MultiDcJoin2MultiJvmNode5 extends MultiDcJoin2Spec
abstract class MultiDcJoin2Spec extends MultiNodeSpec(MultiDcJoin2MultiJvmSpec) with MultiNodeClusterSpec {
import MultiDcJoin2MultiJvmSpec._
"Joining a multi-dc cluster, scenario 2" must {
"make sure oldest is selected correctly" taggedAs LongRunningTest in {
val observer = TestProbe("beta-observer")
runOn(fourth, fifth) {
Cluster(system).subscribe(observer.ref, InitialStateAsEvents, classOf[MemberUp])
}
val memberUpFromFifth = TestProbe("fromFifth")
runOn(fourth) {
system.actorOf(TestActors.forwardActorProps(memberUpFromFifth.ref), "fwFromFifth")
}
// all alpha nodes
awaitClusterUp(first, second, third)
runOn(fourth) {
Cluster(system).join(first)
within(20.seconds) {
awaitAssert {
Cluster(system).state.members
.exists(m => m.address == address(fourth) && m.status == MemberStatus.Up) should ===(true)
}
}
}
// at the same time join fifth, which is the difference compared to MultiDcJoinSpec
runOn(fifth) {
Cluster(system).join(second)
within(10.seconds) {
awaitAssert {
Cluster(system).state.members
.exists(m => m.address == address(fifth) && m.status == MemberStatus.Up) should ===(true)
}
}
}
enterBarrier("beta-joined")
runOn(fifth) {
val events = observer
.receiveN(5)
.map(_.asInstanceOf[MemberUp])
.filter(_.member.dataCenter == "beta")
.sortBy(_.member.upNumber)
events.head.member.upNumber should ===(1)
events(1).member.upNumber should ===(2)
// only sending Address since it's serializable
events.foreach(evt => system.actorSelection(node(fourth) / "user" / "fwFromFifth") ! evt.member.address)
}
enterBarrier("fw-from-fifth")
runOn(fourth) {
val events4 = observer
.receiveN(5)
.map(_.asInstanceOf[MemberUp])
.filter(_.member.dataCenter == "beta")
.sortBy(_.member.upNumber)
val upFromFifth = memberUpFromFifth.receiveN(2).map(_.asInstanceOf[Address])
events4.head.member.address should ===(upFromFifth.head)
events4.head.member.upNumber should ===(1)
events4(1).member.upNumber should ===(2)
observer.expectNoMessage(2.seconds)
}
enterBarrier("done")
}
}
}

View file

@ -0,0 +1,129 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.cluster.ClusterEvent.InitialStateAsEvents
import akka.cluster.ClusterEvent.MemberUp
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
// reproducer for issue #29280
object MultiDcJoinMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
nodeConfig(first, second, third)(ConfigFactory.parseString("""
akka {
cluster.multi-data-center.self-data-center = alpha
}
"""))
nodeConfig(fourth, fifth)(ConfigFactory.parseString("""
akka {
cluster.multi-data-center.self-data-center = beta
}
"""))
commonConfig(ConfigFactory.parseString("""
akka {
actor.provider = cluster
loggers = ["akka.testkit.TestEventListener"]
loglevel = INFO
remote.log-remote-lifecycle-events = off
cluster {
debug.verbose-heartbeat-logging = off
debug.verbose-gossip-logging = off
multi-data-center {
cross-data-center-connections = 1
}
}
}
"""))
}
class MultiDcJoinMultiJvmNode1 extends MultiDcJoinSpec
class MultiDcJoinMultiJvmNode2 extends MultiDcJoinSpec
class MultiDcJoinMultiJvmNode3 extends MultiDcJoinSpec
class MultiDcJoinMultiJvmNode4 extends MultiDcJoinSpec
class MultiDcJoinMultiJvmNode5 extends MultiDcJoinSpec
abstract class MultiDcJoinSpec extends MultiNodeSpec(MultiDcJoinMultiJvmSpec) with MultiNodeClusterSpec {
import MultiDcJoinMultiJvmSpec._
"Joining a multi-dc cluster" must {
"make sure oldest is selected correctly" taggedAs LongRunningTest in {
val fourthAddress = address(fourth)
val fifthAddress = address(fifth)
val sortedBetaAddresses = List(fourthAddress, fifthAddress).sorted
// beta1 has lower address than beta2 and will therefore be chosen as leader
// The problematic scenario in issue #29280 is triggered by beta2 joining first, moving itself to Up,
// and then beta1 joining and also moving itself to Up.
val (beta1, beta1Address, beta2, beta2Address) =
if (sortedBetaAddresses.head == fourthAddress) (fourth, fourthAddress, fifth, fifthAddress)
else (fifth, fifthAddress, fourth, fourthAddress)
val observer = TestProbe("beta-observer")
runOn(fourth, fifth) {
Cluster(system).subscribe(observer.ref, InitialStateAsEvents, classOf[MemberUp])
}
// all alpha nodes
awaitClusterUp(first, second, third)
runOn(beta2) {
Cluster(system).join(first)
within(20.seconds) {
awaitAssert {
Cluster(system).state.members
.exists(m => m.address == beta2Address && m.status == MemberStatus.Up) should ===(true)
}
}
}
enterBarrier("beta2-joined")
runOn(beta1) {
Cluster(system).join(second)
within(10.seconds) {
awaitAssert {
Cluster(system).state.members
.exists(m => m.address == beta1Address && m.status == MemberStatus.Up) should ===(true)
}
}
}
enterBarrier("beta1-joined")
runOn(fourth, fifth) {
val events = observer
.receiveN(5)
.map(_.asInstanceOf[MemberUp])
.filter(_.member.dataCenter == "beta")
.sortBy(_.member.upNumber)
events.head.member.address should ===(beta2Address)
events.head.member.upNumber should ===(1)
events(1).member.address should ===(beta1Address)
events(1).member.upNumber should ===(2)
observer.expectNoMessage(2.seconds)
}
enterBarrier("done")
}
}
}

View file

@ -256,6 +256,22 @@ class GossipSpec extends AnyWordSpec with Matchers {
state(g, dc2c1).convergence(Set.empty) should ===(false) state(g, dc2c1).convergence(Set.empty) should ===(false)
} }
"not reach convergence for first member of other data center until all have seen the gossip" in {
val dc2e1 = TestMember(e1.address, status = Joining, roles = Set.empty, dataCenter = "dc2")
val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2e1)).seen(dc1a1.uniqueAddress).seen(dc2e1.uniqueAddress)
// dc1b1 has not seen the gossip
// dc1 hasn't reached convergence because dc1b1 hasn't marked it as seen
state(g, dc1a1).convergence(Set.empty) should ===(false)
// and not dc2 because dc2e1 is only Joining
state(g, dc2e1).convergence(Set.empty) should ===(false)
// until all have seen it
val g2 = g.seen(dc1b1.uniqueAddress)
state(g2, dc2e1).convergence(Set.empty) should ===(true)
}
"reach convergence per data center even if another data center contains unreachable" in { "reach convergence per data center even if another data center contains unreachable" in {
val r1 = Reachability.empty.unreachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress) val r1 = Reachability.empty.unreachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress)

View file

@ -96,9 +96,9 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g3)) checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g3))
checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g4)) checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g4))
checkSerialization(GossipStatus(a1.uniqueAddress, g1.version)) checkSerialization(GossipStatus(a1.uniqueAddress, g1.version, g1.seenDigest))
checkSerialization(GossipStatus(a1.uniqueAddress, g2.version)) checkSerialization(GossipStatus(a1.uniqueAddress, g2.version, g2.seenDigest))
checkSerialization(GossipStatus(a1.uniqueAddress, g3.version)) checkSerialization(GossipStatus(a1.uniqueAddress, g3.version, g3.seenDigest))
checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2)) checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2))
} }
@ -141,7 +141,7 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust
ClusterMessageSerializer.OldGossipEnvelopeManifest) ClusterMessageSerializer.OldGossipEnvelopeManifest)
checkDeserializationWithManifest( checkDeserializationWithManifest(
GossipStatus(a1.uniqueAddress, g1.version), GossipStatus(a1.uniqueAddress, g1.version, g1.seenDigest),
ClusterMessageSerializer.OldGossipStatusManifest) ClusterMessageSerializer.OldGossipStatusManifest)
checkDeserializationWithManifest( checkDeserializationWithManifest(