diff --git a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java index f3a07fa363..3c7b093856 100644 --- a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java +++ b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java @@ -7058,6 +7058,17 @@ public final class ClusterMessages { * required .VectorClock version = 3; */ akka.cluster.protobuf.msg.ClusterMessages.VectorClockOrBuilder getVersionOrBuilder(); + + /** + * optional bytes seenDigest = 4; + * @return Whether the seenDigest field is set. + */ + boolean hasSeenDigest(); + /** + * optional bytes seenDigest = 4; + * @return The seenDigest. + */ + akka.protobufv3.internal.ByteString getSeenDigest(); } /** *
@@ -7078,6 +7089,7 @@ public final class ClusterMessages {
     }
     private GossipStatus() {
       allHashes_ = akka.protobufv3.internal.LazyStringArrayList.EMPTY;
+      seenDigest_ = akka.protobufv3.internal.ByteString.EMPTY;
     }
 
     @java.lang.Override
@@ -7146,6 +7158,11 @@ public final class ClusterMessages {
               bitField0_ |= 0x00000002;
               break;
             }
+            case 34: {
+              bitField0_ |= 0x00000004;
+              seenDigest_ = input.readBytes();
+              break;
+            }
             default: {
               if (!parseUnknownField(
                   input, unknownFields, extensionRegistry, tag)) {
@@ -7263,6 +7280,23 @@ public final class ClusterMessages {
       return version_ == null ? akka.cluster.protobuf.msg.ClusterMessages.VectorClock.getDefaultInstance() : version_;
     }
 
+    public static final int SEENDIGEST_FIELD_NUMBER = 4;
+    private akka.protobufv3.internal.ByteString seenDigest_;
+    /**
+     * optional bytes seenDigest = 4;
+     * @return Whether the seenDigest field is set.
+     */
+    public boolean hasSeenDigest() {
+      return ((bitField0_ & 0x00000004) != 0);
+    }
+    /**
+     * optional bytes seenDigest = 4;
+     * @return The seenDigest.
+     */
+    public akka.protobufv3.internal.ByteString getSeenDigest() {
+      return seenDigest_;
+    }
+
     private byte memoizedIsInitialized = -1;
     @java.lang.Override
     public final boolean isInitialized() {
@@ -7302,6 +7336,9 @@ public final class ClusterMessages {
       if (((bitField0_ & 0x00000002) != 0)) {
         output.writeMessage(3, getVersion());
       }
+      if (((bitField0_ & 0x00000004) != 0)) {
+        output.writeBytes(4, seenDigest_);
+      }
       unknownFields.writeTo(output);
     }
 
@@ -7327,6 +7364,10 @@ public final class ClusterMessages {
         size += akka.protobufv3.internal.CodedOutputStream
           .computeMessageSize(3, getVersion());
       }
+      if (((bitField0_ & 0x00000004) != 0)) {
+        size += akka.protobufv3.internal.CodedOutputStream
+          .computeBytesSize(4, seenDigest_);
+      }
       size += unknownFields.getSerializedSize();
       memoizedSize = size;
       return size;
@@ -7354,6 +7395,11 @@ public final class ClusterMessages {
         if (!getVersion()
             .equals(other.getVersion())) return false;
       }
+      if (hasSeenDigest() != other.hasSeenDigest()) return false;
+      if (hasSeenDigest()) {
+        if (!getSeenDigest()
+            .equals(other.getSeenDigest())) return false;
+      }
       if (!unknownFields.equals(other.unknownFields)) return false;
       return true;
     }
@@ -7377,6 +7423,10 @@ public final class ClusterMessages {
         hash = (37 * hash) + VERSION_FIELD_NUMBER;
         hash = (53 * hash) + getVersion().hashCode();
       }
+      if (hasSeenDigest()) {
+        hash = (37 * hash) + SEENDIGEST_FIELD_NUMBER;
+        hash = (53 * hash) + getSeenDigest().hashCode();
+      }
       hash = (29 * hash) + unknownFields.hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -7531,6 +7581,8 @@ public final class ClusterMessages {
           versionBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000004);
+        seenDigest_ = akka.protobufv3.internal.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -7580,6 +7632,10 @@ public final class ClusterMessages {
           }
           to_bitField0_ |= 0x00000002;
         }
+        if (((from_bitField0_ & 0x00000008) != 0)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.seenDigest_ = seenDigest_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -7645,6 +7701,9 @@ public final class ClusterMessages {
         if (other.hasVersion()) {
           mergeVersion(other.getVersion());
         }
+        if (other.hasSeenDigest()) {
+          setSeenDigest(other.getSeenDigest());
+        }
         this.mergeUnknownFields(other.unknownFields);
         onChanged();
         return this;
@@ -8035,6 +8094,46 @@ public final class ClusterMessages {
         }
         return versionBuilder_;
       }
+
+      private akka.protobufv3.internal.ByteString seenDigest_ = akka.protobufv3.internal.ByteString.EMPTY;
+      /**
+       * optional bytes seenDigest = 4;
+       * @return Whether the seenDigest field is set.
+       */
+      public boolean hasSeenDigest() {
+        return ((bitField0_ & 0x00000008) != 0);
+      }
+      /**
+       * optional bytes seenDigest = 4;
+       * @return The seenDigest.
+       */
+      public akka.protobufv3.internal.ByteString getSeenDigest() {
+        return seenDigest_;
+      }
+      /**
+       * optional bytes seenDigest = 4;
+       * @param value The seenDigest to set.
+       * @return This builder for chaining.
+       */
+      public Builder setSeenDigest(akka.protobufv3.internal.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        seenDigest_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * optional bytes seenDigest = 4;
+       * @return This builder for chaining.
+       */
+      public Builder clearSeenDigest() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        seenDigest_ = getDefaultInstance().getSeenDigest();
+        onChanged();
+        return this;
+      }
       @java.lang.Override
       public final Builder setUnknownFields(
           final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
@@ -21819,46 +21918,46 @@ public final class ClusterMessages {
       "iqueAddress\022\022\n\nsequenceNr\030\002 \001(\003\022\024\n\014creat" +
       "ionTime\030\003 \001(\003\"d\n\016GossipEnvelope\022\034\n\004from\030" +
       "\001 \002(\0132\016.UniqueAddress\022\032\n\002to\030\002 \002(\0132\016.Uniq" +
-      "ueAddress\022\030\n\020serializedGossip\030\003 \002(\014\"^\n\014G" +
+      "ueAddress\022\030\n\020serializedGossip\030\003 \002(\014\"r\n\014G" +
       "ossipStatus\022\034\n\004from\030\001 \002(\0132\016.UniqueAddres" +
       "s\022\021\n\tallHashes\030\002 \003(\t\022\035\n\007version\030\003 \002(\0132\014." +
-      "VectorClock\"\317\001\n\006Gossip\022$\n\014allAddresses\030\001" +
-      " \003(\0132\016.UniqueAddress\022\020\n\010allRoles\030\002 \003(\t\022\021" +
-      "\n\tallHashes\030\003 \003(\t\022\030\n\007members\030\004 \003(\0132\007.Mem" +
-      "ber\022!\n\010overview\030\005 \002(\0132\017.GossipOverview\022\035" +
-      "\n\007version\030\006 \002(\0132\014.VectorClock\022\036\n\ntombsto" +
-      "nes\030\007 \003(\0132\n.Tombstone\"S\n\016GossipOverview\022" +
-      "\014\n\004seen\030\001 \003(\005\0223\n\024observerReachability\030\002 " +
-      "\003(\0132\025.ObserverReachability\"p\n\024ObserverRe" +
-      "achability\022\024\n\014addressIndex\030\001 \002(\005\022\017\n\007vers" +
-      "ion\030\004 \002(\003\0221\n\023subjectReachability\030\002 \003(\0132\024" +
-      ".SubjectReachability\"a\n\023SubjectReachabil" +
-      "ity\022\024\n\014addressIndex\030\001 \002(\005\022#\n\006status\030\003 \002(" +
-      "\0162\023.ReachabilityStatus\022\017\n\007version\030\004 \002(\003\"" +
-      "4\n\tTombstone\022\024\n\014addressIndex\030\001 \002(\005\022\021\n\tti" +
-      "mestamp\030\002 \002(\003\"i\n\006Member\022\024\n\014addressIndex\030" +
-      "\001 \002(\005\022\020\n\010upNumber\030\002 \002(\005\022\035\n\006status\030\003 \002(\0162" +
-      "\r.MemberStatus\022\030\n\014rolesIndexes\030\004 \003(\005B\002\020\001" +
-      "\"y\n\013VectorClock\022\021\n\ttimestamp\030\001 \001(\003\022&\n\010ve" +
-      "rsions\030\002 \003(\0132\024.VectorClock.Version\032/\n\007Ve" +
-      "rsion\022\021\n\thashIndex\030\001 \002(\005\022\021\n\ttimestamp\030\002 " +
-      "\002(\003\"\007\n\005Empty\"K\n\007Address\022\016\n\006system\030\001 \002(\t\022" +
-      "\020\n\010hostname\030\002 \002(\t\022\014\n\004port\030\003 \002(\r\022\020\n\010proto" +
-      "col\030\004 \001(\t\"E\n\rUniqueAddress\022\031\n\007address\030\001 " +
-      "\002(\0132\010.Address\022\013\n\003uid\030\002 \002(\r\022\014\n\004uid2\030\003 \001(\r" +
-      "\"V\n\021ClusterRouterPool\022\023\n\004pool\030\001 \002(\0132\005.Po" +
-      "ol\022,\n\010settings\030\002 \002(\0132\032.ClusterRouterPool" +
-      "Settings\"<\n\004Pool\022\024\n\014serializerId\030\001 \002(\r\022\020" +
-      "\n\010manifest\030\002 \002(\t\022\014\n\004data\030\003 \002(\014\"\216\001\n\031Clust" +
-      "erRouterPoolSettings\022\026\n\016totalInstances\030\001" +
-      " \002(\r\022\033\n\023maxInstancesPerNode\030\002 \002(\r\022\031\n\021all" +
-      "owLocalRoutees\030\003 \002(\010\022\017\n\007useRole\030\004 \001(\t\022\020\n" +
-      "\010useRoles\030\005 \003(\t*D\n\022ReachabilityStatus\022\r\n" +
-      "\tReachable\020\000\022\017\n\013Unreachable\020\001\022\016\n\nTermina" +
-      "ted\020\002*b\n\014MemberStatus\022\013\n\007Joining\020\000\022\006\n\002Up" +
-      "\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022\010\n\004Down\020\004\022\013" +
-      "\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006B\035\n\031akka.cluste" +
-      "r.protobuf.msgH\001"
+      "VectorClock\022\022\n\nseenDigest\030\004 \001(\014\"\317\001\n\006Goss" +
+      "ip\022$\n\014allAddresses\030\001 \003(\0132\016.UniqueAddress" +
+      "\022\020\n\010allRoles\030\002 \003(\t\022\021\n\tallHashes\030\003 \003(\t\022\030\n" +
+      "\007members\030\004 \003(\0132\007.Member\022!\n\010overview\030\005 \002(" +
+      "\0132\017.GossipOverview\022\035\n\007version\030\006 \002(\0132\014.Ve" +
+      "ctorClock\022\036\n\ntombstones\030\007 \003(\0132\n.Tombston" +
+      "e\"S\n\016GossipOverview\022\014\n\004seen\030\001 \003(\005\0223\n\024obs" +
+      "erverReachability\030\002 \003(\0132\025.ObserverReacha" +
+      "bility\"p\n\024ObserverReachability\022\024\n\014addres" +
+      "sIndex\030\001 \002(\005\022\017\n\007version\030\004 \002(\003\0221\n\023subject" +
+      "Reachability\030\002 \003(\0132\024.SubjectReachability" +
+      "\"a\n\023SubjectReachability\022\024\n\014addressIndex\030" +
+      "\001 \002(\005\022#\n\006status\030\003 \002(\0162\023.ReachabilityStat" +
+      "us\022\017\n\007version\030\004 \002(\003\"4\n\tTombstone\022\024\n\014addr" +
+      "essIndex\030\001 \002(\005\022\021\n\ttimestamp\030\002 \002(\003\"i\n\006Mem" +
+      "ber\022\024\n\014addressIndex\030\001 \002(\005\022\020\n\010upNumber\030\002 " +
+      "\002(\005\022\035\n\006status\030\003 \002(\0162\r.MemberStatus\022\030\n\014ro" +
+      "lesIndexes\030\004 \003(\005B\002\020\001\"y\n\013VectorClock\022\021\n\tt" +
+      "imestamp\030\001 \001(\003\022&\n\010versions\030\002 \003(\0132\024.Vecto" +
+      "rClock.Version\032/\n\007Version\022\021\n\thashIndex\030\001" +
+      " \002(\005\022\021\n\ttimestamp\030\002 \002(\003\"\007\n\005Empty\"K\n\007Addr" +
+      "ess\022\016\n\006system\030\001 \002(\t\022\020\n\010hostname\030\002 \002(\t\022\014\n" +
+      "\004port\030\003 \002(\r\022\020\n\010protocol\030\004 \001(\t\"E\n\rUniqueA" +
+      "ddress\022\031\n\007address\030\001 \002(\0132\010.Address\022\013\n\003uid" +
+      "\030\002 \002(\r\022\014\n\004uid2\030\003 \001(\r\"V\n\021ClusterRouterPoo" +
+      "l\022\023\n\004pool\030\001 \002(\0132\005.Pool\022,\n\010settings\030\002 \002(\013" +
+      "2\032.ClusterRouterPoolSettings\"<\n\004Pool\022\024\n\014" +
+      "serializerId\030\001 \002(\r\022\020\n\010manifest\030\002 \002(\t\022\014\n\004" +
+      "data\030\003 \002(\014\"\216\001\n\031ClusterRouterPoolSettings" +
+      "\022\026\n\016totalInstances\030\001 \002(\r\022\033\n\023maxInstances" +
+      "PerNode\030\002 \002(\r\022\031\n\021allowLocalRoutees\030\003 \002(\010" +
+      "\022\017\n\007useRole\030\004 \001(\t\022\020\n\010useRoles\030\005 \003(\t*D\n\022R" +
+      "eachabilityStatus\022\r\n\tReachable\020\000\022\017\n\013Unre" +
+      "achable\020\001\022\016\n\nTerminated\020\002*b\n\014MemberStatu" +
+      "s\022\013\n\007Joining\020\000\022\006\n\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007E" +
+      "xiting\020\003\022\010\n\004Down\020\004\022\013\n\007Removed\020\005\022\014\n\010Weakl" +
+      "yUp\020\006B\035\n\031akka.cluster.protobuf.msgH\001"
     };
     descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
       .internalBuildGeneratedFileFrom(descriptorData,
@@ -21917,7 +22016,7 @@ public final class ClusterMessages {
     internal_static_GossipStatus_fieldAccessorTable = new
       akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
         internal_static_GossipStatus_descriptor,
-        new java.lang.String[] { "From", "AllHashes", "Version", });
+        new java.lang.String[] { "From", "AllHashes", "Version", "SeenDigest", });
     internal_static_Gossip_descriptor =
       getDescriptor().getMessageTypes().get(9);
     internal_static_Gossip_fieldAccessorTable = new
diff --git a/akka-cluster/src/main/mima-filters/2.6.8.backwards.excludes/issue-29280-GossipStatus.excludes b/akka-cluster/src/main/mima-filters/2.6.8.backwards.excludes/issue-29280-GossipStatus.excludes
new file mode 100644
index 0000000000..d6d74879fd
--- /dev/null
+++ b/akka-cluster/src/main/mima-filters/2.6.8.backwards.excludes/issue-29280-GossipStatus.excludes
@@ -0,0 +1,3 @@
+# #29280
+ProblemFilters.exclude[Problem]("akka.cluster.GossipStatus*")
+ProblemFilters.exclude[Problem]("akka.cluster.protobuf.msg.ClusterMessages*")
diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto
index 7409d2ee55..04a77f2e6a 100644
--- a/akka-cluster/src/main/protobuf/ClusterMessages.proto
+++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto
@@ -119,6 +119,7 @@ message GossipStatus {
   required UniqueAddress from = 1;
   repeated string allHashes = 2;
   required VectorClock version = 3;
+  optional bytes seenDigest = 4;
 }
 
 /**
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
index 7dd05fe381..03e52848a6 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
@@ -970,10 +970,20 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
     else if (!latestGossip.isReachable(selfUniqueAddress, from))
       gossipLogger.logInfo("Ignoring received gossip status from unreachable [{}] ", from)
     else {
-      status.version.compareTo(latestGossip.version) match {
-        case VectorClock.Same  => // same version
-        case VectorClock.After => gossipStatusTo(from, sender()) // remote is newer
-        case _                 => gossipTo(from, sender()) // conflicting or local is newer
+      val localSeenDigest = latestGossip.seenDigest
+      val seenSame =
+        if (status.seenDigest.isEmpty || localSeenDigest.isEmpty) true
+        else java.util.Arrays.equals(status.seenDigest, localSeenDigest)
+      if (!seenSame) {
+        gossipTo(from, sender())
+      } else {
+        status.version.compareTo(latestGossip.version) match {
+          case VectorClock.Same => // same version
+          case VectorClock.After =>
+            gossipStatusTo(from, sender()) // remote is newer
+          case _ =>
+            gossipTo(from, sender()) // conflicting or local is newer
+        }
       }
     }
   }
@@ -1492,11 +1502,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
 
   def gossipStatusTo(node: UniqueAddress, destination: ActorRef): Unit =
     if (membershipState.validNodeForGossip(node))
-      destination ! GossipStatus(selfUniqueAddress, latestGossip.version)
+      destination ! GossipStatus(selfUniqueAddress, latestGossip.version, latestGossip.seenDigest)
 
   def gossipStatusTo(node: UniqueAddress): Unit =
     if (membershipState.validNodeForGossip(node))
-      clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version)
+      clusterCore(node.address) ! GossipStatus(selfUniqueAddress, latestGossip.version, latestGossip.seenDigest)
 
   def updateLatestGossip(gossip: Gossip): Unit = {
     // Updating the vclock version for the changes
diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala
index d045b43296..e7aef767af 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala
@@ -4,12 +4,14 @@
 
 package akka.cluster
 
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+
 import scala.collection.immutable
 import scala.concurrent.duration.Deadline
 
 import ClusterSettings.DataCenter
 import MemberStatus._
-
 import akka.annotation.InternalApi
 
 /**
@@ -267,6 +269,9 @@ private[cluster] final case class Gossip(
     else copy(tombstones = newTombstones)
   }
 
+  def seenDigest: Array[Byte] =
+    overview.seenDigest
+
   override def toString =
     s"Gossip(members = [${members.mkString(", ")}], overview = $overview, version = $version, tombstones = $tombstones)"
 }
@@ -280,6 +285,11 @@ private[cluster] final case class GossipOverview(
     seen: Set[UniqueAddress] = Set.empty,
     reachability: Reachability = Reachability.empty) {
 
+  lazy val seenDigest: Array[Byte] = {
+    val bytes = seen.toVector.sorted.map(node => node.address).mkString(",").getBytes(StandardCharsets.UTF_8)
+    MessageDigest.getInstance("SHA-1").digest(bytes)
+  }
+
   override def toString =
     s"GossipOverview(reachability = [$reachability], seen = [${seen.mkString(", ")}])"
 }
@@ -340,4 +350,16 @@ private[cluster] class GossipEnvelope private (
  * it replies with its `GossipStatus`. Same versions ends the chat immediately.
  */
 @SerialVersionUID(1L)
-private[cluster] final case class GossipStatus(from: UniqueAddress, version: VectorClock) extends ClusterMessage
+private[cluster] final case class GossipStatus(from: UniqueAddress, version: VectorClock, seenDigest: Array[Byte])
+    extends ClusterMessage {
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case other: GossipStatus =>
+        from == other.from && version == other.version && java.util.Arrays.equals(seenDigest, other.seenDigest)
+      case _ => false
+    }
+  }
+
+  override def toString: DataCenter =
+    f"GossipStatus($from,$version,${seenDigest.map(byte => f"$byte%02x").mkString("")})"
+}
diff --git a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala
index f40da163f5..6c4a326d42 100644
--- a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala
@@ -55,12 +55,17 @@ import akka.util.ccompat._
    */
   def convergence(exitingConfirmed: Set[UniqueAddress]): Boolean = {
 
+    // full convergence needed for first member in a secondary DC
+    val firstMemberInDc =
+      !members.exists(member => member.dataCenter == selfDc && convergenceMemberStatus(member.status))
+
     // If another member in the data center that is UP or LEAVING and has not seen this gossip or is exiting
-    // convergence cannot be reached
+    // convergence cannot be reached. For the first member in a secondary DC all members must have seen
+    // the gossip state.
     def memberHinderingConvergenceExists =
       members.exists(
         member =>
-          member.dataCenter == selfDc &&
+          (firstMemberInDc || member.dataCenter == selfDc) &&
           convergenceMemberStatus(member.status) &&
           !(latestGossip.seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress)))
 
diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala
index 869716685d..6714748d3e 100644
--- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala
@@ -510,6 +510,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
       .setFrom(uniqueAddressToProto(status.from))
       .addAllAllHashes(allHashes.asJava)
       .setVersion(vectorClockToProto(status.version, hashMapping))
+      .setSeenDigest(ByteString.copyFrom(status.seenDigest))
       .build()
   }
 
@@ -594,10 +595,15 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
       () => gossipFromProto(cm.Gossip.parseFrom(decompress(serializedGossip.toByteArray))))
   }
 
-  private def gossipStatusFromProto(status: cm.GossipStatus): GossipStatus =
+  private def gossipStatusFromProto(status: cm.GossipStatus): GossipStatus = {
+    val seenDigest =
+      if (status.hasSeenDigest) status.getSeenDigest.toByteArray
+      else Array.emptyByteArray
     GossipStatus(
       uniqueAddressFromProto(status.getFrom),
-      vectorClockFromProto(status.getVersion, status.getAllHashesList.asScala.toVector))
+      vectorClockFromProto(status.getVersion, status.getAllHashesList.asScala.toVector),
+      seenDigest)
+  }
 
   def deserializeClusterRouterPool(bytes: Array[Byte]): ClusterRouterPool = {
     val crp = cm.ClusterRouterPool.parseFrom(bytes)
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcJoin2Spec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcJoin2Spec.scala
new file mode 100644
index 0000000000..ea9135eb3f
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcJoin2Spec.scala
@@ -0,0 +1,143 @@
+/*
+ * Copyright (C) 2009-2020 Lightbend Inc. 
+ */
+
+package akka.cluster
+
+import scala.concurrent.duration._
+
+import com.typesafe.config.ConfigFactory
+
+import akka.actor.Address
+import akka.cluster.ClusterEvent.InitialStateAsEvents
+import akka.cluster.ClusterEvent.MemberUp
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.testkit._
+
+// Similar to MultiDcJoinSpec, but slightly different scenario
+object MultiDcJoin2MultiJvmSpec extends MultiNodeConfig {
+  val first = role("first")
+  val second = role("second")
+  val third = role("third")
+  val fourth = role("fourth")
+  val fifth = role("fifth")
+
+  nodeConfig(first, second, third)(ConfigFactory.parseString("""
+    akka {
+      cluster.multi-data-center.self-data-center = alpha
+    }
+    """))
+
+  nodeConfig(fourth, fifth)(ConfigFactory.parseString("""
+    akka {
+      cluster.multi-data-center.self-data-center = beta
+    }
+    """))
+
+  commonConfig(ConfigFactory.parseString("""
+    akka {
+      actor.provider = cluster
+
+      loggers = ["akka.testkit.TestEventListener"]
+      loglevel = INFO
+
+      remote.log-remote-lifecycle-events = off
+
+      cluster {
+        debug.verbose-heartbeat-logging = off
+        debug.verbose-gossip-logging = off
+
+        multi-data-center {
+          cross-data-center-connections = 1
+        }
+      }
+    }
+    """))
+
+}
+
+class MultiDcJoin2MultiJvmNode1 extends MultiDcJoin2Spec
+class MultiDcJoin2MultiJvmNode2 extends MultiDcJoin2Spec
+class MultiDcJoin2MultiJvmNode3 extends MultiDcJoin2Spec
+class MultiDcJoin2MultiJvmNode4 extends MultiDcJoin2Spec
+class MultiDcJoin2MultiJvmNode5 extends MultiDcJoin2Spec
+
+abstract class MultiDcJoin2Spec extends MultiNodeSpec(MultiDcJoin2MultiJvmSpec) with MultiNodeClusterSpec {
+  import MultiDcJoin2MultiJvmSpec._
+
+  "Joining a multi-dc cluster, scenario 2" must {
+    "make sure oldest is selected correctly" taggedAs LongRunningTest in {
+
+      val observer = TestProbe("beta-observer")
+      runOn(fourth, fifth) {
+        Cluster(system).subscribe(observer.ref, InitialStateAsEvents, classOf[MemberUp])
+      }
+      val memberUpFromFifth = TestProbe("fromFifth")
+      runOn(fourth) {
+        system.actorOf(TestActors.forwardActorProps(memberUpFromFifth.ref), "fwFromFifth")
+      }
+
+      // all alpha nodes
+      awaitClusterUp(first, second, third)
+
+      runOn(fourth) {
+        Cluster(system).join(first)
+        within(20.seconds) {
+          awaitAssert {
+            Cluster(system).state.members
+              .exists(m => m.address == address(fourth) && m.status == MemberStatus.Up) should ===(true)
+          }
+        }
+      }
+
+      // at the same time join fifth, which is the difference compared to MultiDcJoinSpec
+      runOn(fifth) {
+        Cluster(system).join(second)
+        within(10.seconds) {
+          awaitAssert {
+            Cluster(system).state.members
+              .exists(m => m.address == address(fifth) && m.status == MemberStatus.Up) should ===(true)
+          }
+        }
+      }
+      enterBarrier("beta-joined")
+
+      runOn(fifth) {
+        val events = observer
+          .receiveN(5)
+          .map(_.asInstanceOf[MemberUp])
+          .filter(_.member.dataCenter == "beta")
+          .sortBy(_.member.upNumber)
+
+        events.head.member.upNumber should ===(1)
+        events(1).member.upNumber should ===(2)
+
+        // only sending Address since it's serializable
+        events.foreach(evt => system.actorSelection(node(fourth) / "user" / "fwFromFifth") ! evt.member.address)
+      }
+      enterBarrier("fw-from-fifth")
+
+      runOn(fourth) {
+        val events4 = observer
+          .receiveN(5)
+          .map(_.asInstanceOf[MemberUp])
+          .filter(_.member.dataCenter == "beta")
+          .sortBy(_.member.upNumber)
+
+        val upFromFifth = memberUpFromFifth.receiveN(2).map(_.asInstanceOf[Address])
+
+        events4.head.member.address should ===(upFromFifth.head)
+
+        events4.head.member.upNumber should ===(1)
+        events4(1).member.upNumber should ===(2)
+
+        observer.expectNoMessage(2.seconds)
+      }
+
+      enterBarrier("done")
+    }
+
+  }
+
+}
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcJoinSpec.scala
new file mode 100644
index 0000000000..5b47b892ec
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcJoinSpec.scala
@@ -0,0 +1,129 @@
+/*
+ * Copyright (C) 2009-2020 Lightbend Inc. 
+ */
+
+package akka.cluster
+
+import scala.concurrent.duration._
+
+import com.typesafe.config.ConfigFactory
+
+import akka.cluster.ClusterEvent.InitialStateAsEvents
+import akka.cluster.ClusterEvent.MemberUp
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.testkit._
+
+// reproducer for issue #29280
+object MultiDcJoinMultiJvmSpec extends MultiNodeConfig {
+  val first = role("first")
+  val second = role("second")
+  val third = role("third")
+  val fourth = role("fourth")
+  val fifth = role("fifth")
+
+  nodeConfig(first, second, third)(ConfigFactory.parseString("""
+    akka {
+      cluster.multi-data-center.self-data-center = alpha
+    }
+    """))
+
+  nodeConfig(fourth, fifth)(ConfigFactory.parseString("""
+    akka {
+      cluster.multi-data-center.self-data-center = beta
+    }
+    """))
+
+  commonConfig(ConfigFactory.parseString("""
+    akka {
+      actor.provider = cluster
+
+      loggers = ["akka.testkit.TestEventListener"]
+      loglevel = INFO
+
+      remote.log-remote-lifecycle-events = off
+
+      cluster {
+        debug.verbose-heartbeat-logging = off
+        debug.verbose-gossip-logging = off
+
+        multi-data-center {
+          cross-data-center-connections = 1
+        }
+      }
+    }
+    """))
+
+}
+
+class MultiDcJoinMultiJvmNode1 extends MultiDcJoinSpec
+class MultiDcJoinMultiJvmNode2 extends MultiDcJoinSpec
+class MultiDcJoinMultiJvmNode3 extends MultiDcJoinSpec
+class MultiDcJoinMultiJvmNode4 extends MultiDcJoinSpec
+class MultiDcJoinMultiJvmNode5 extends MultiDcJoinSpec
+
+abstract class MultiDcJoinSpec extends MultiNodeSpec(MultiDcJoinMultiJvmSpec) with MultiNodeClusterSpec {
+  import MultiDcJoinMultiJvmSpec._
+
+  "Joining a multi-dc cluster" must {
+    "make sure oldest is selected correctly" taggedAs LongRunningTest in {
+      val fourthAddress = address(fourth)
+      val fifthAddress = address(fifth)
+      val sortedBetaAddresses = List(fourthAddress, fifthAddress).sorted
+      // beta1 has lower address than beta2 and will therefore be chosen as leader
+      // The problematic scenario in issue #29280 is triggered by beta2 joining first, moving itself to Up,
+      // and then beta1 joining and also moving itself to Up.
+      val (beta1, beta1Address, beta2, beta2Address) =
+        if (sortedBetaAddresses.head == fourthAddress) (fourth, fourthAddress, fifth, fifthAddress)
+        else (fifth, fifthAddress, fourth, fourthAddress)
+
+      val observer = TestProbe("beta-observer")
+      runOn(fourth, fifth) {
+        Cluster(system).subscribe(observer.ref, InitialStateAsEvents, classOf[MemberUp])
+      }
+
+      // all alpha nodes
+      awaitClusterUp(first, second, third)
+
+      runOn(beta2) {
+        Cluster(system).join(first)
+        within(20.seconds) {
+          awaitAssert {
+            Cluster(system).state.members
+              .exists(m => m.address == beta2Address && m.status == MemberStatus.Up) should ===(true)
+          }
+        }
+      }
+      enterBarrier("beta2-joined")
+
+      runOn(beta1) {
+        Cluster(system).join(second)
+        within(10.seconds) {
+          awaitAssert {
+            Cluster(system).state.members
+              .exists(m => m.address == beta1Address && m.status == MemberStatus.Up) should ===(true)
+          }
+        }
+      }
+      enterBarrier("beta1-joined")
+
+      runOn(fourth, fifth) {
+        val events = observer
+          .receiveN(5)
+          .map(_.asInstanceOf[MemberUp])
+          .filter(_.member.dataCenter == "beta")
+          .sortBy(_.member.upNumber)
+        events.head.member.address should ===(beta2Address)
+        events.head.member.upNumber should ===(1)
+        events(1).member.address should ===(beta1Address)
+        events(1).member.upNumber should ===(2)
+
+        observer.expectNoMessage(2.seconds)
+      }
+
+      enterBarrier("done")
+    }
+
+  }
+
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala
index b955604086..c57ec39849 100644
--- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala
@@ -256,6 +256,22 @@ class GossipSpec extends AnyWordSpec with Matchers {
       state(g, dc2c1).convergence(Set.empty) should ===(false)
     }
 
+    "not reach convergence for first member of other data center until all have seen the gossip" in {
+      val dc2e1 = TestMember(e1.address, status = Joining, roles = Set.empty, dataCenter = "dc2")
+      val g = Gossip(members = SortedSet(dc1a1, dc1b1, dc2e1)).seen(dc1a1.uniqueAddress).seen(dc2e1.uniqueAddress)
+      // dc1b1 has not seen the gossip
+
+      // dc1 hasn't reached convergence because dc1b1 hasn't marked it as seen
+      state(g, dc1a1).convergence(Set.empty) should ===(false)
+
+      // and not dc2 because dc2e1 is only Joining
+      state(g, dc2e1).convergence(Set.empty) should ===(false)
+
+      // until all have seen it
+      val g2 = g.seen(dc1b1.uniqueAddress)
+      state(g2, dc2e1).convergence(Set.empty) should ===(true)
+    }
+
     "reach convergence per data center even if another data center contains unreachable" in {
       val r1 = Reachability.empty.unreachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress)
 
diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala
index e727a47c0e..a9edc15f1b 100644
--- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala
@@ -96,9 +96,9 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust
       checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g3))
       checkSerialization(GossipEnvelope(a1.uniqueAddress, uniqueAddress2, g4))
 
-      checkSerialization(GossipStatus(a1.uniqueAddress, g1.version))
-      checkSerialization(GossipStatus(a1.uniqueAddress, g2.version))
-      checkSerialization(GossipStatus(a1.uniqueAddress, g3.version))
+      checkSerialization(GossipStatus(a1.uniqueAddress, g1.version, g1.seenDigest))
+      checkSerialization(GossipStatus(a1.uniqueAddress, g2.version, g2.seenDigest))
+      checkSerialization(GossipStatus(a1.uniqueAddress, g3.version, g3.seenDigest))
 
       checkSerialization(InternalClusterAction.Welcome(uniqueAddress, g2))
     }
@@ -141,7 +141,7 @@ class ClusterMessageSerializerSpec extends AkkaSpec("akka.actor.provider = clust
         ClusterMessageSerializer.OldGossipEnvelopeManifest)
 
       checkDeserializationWithManifest(
-        GossipStatus(a1.uniqueAddress, g1.version),
+        GossipStatus(a1.uniqueAddress, g1.version, g1.seenDigest),
         ClusterMessageSerializer.OldGossipStatusManifest)
 
       checkDeserializationWithManifest(