diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index a2575b0a76..630a2f8024 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -112,11 +112,8 @@ message Gossip { * Gossip Overview */ message GossipOverview { - message Seen { - required int32 addressIndex = 1; - required VectorClock version = 2; - } - repeated Seen seen = 1; + /* This is the address indexes for the nodes that have seen this gossip */ + repeated int32 seen = 1; repeated Member unreachable = 2; } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index d03d21fa29..d7d4eb3be5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -14,6 +14,7 @@ import akka.actor.SupervisorStrategy.Stop import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import scala.collection.breakOut /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -593,9 +594,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val comparison = remoteGossip.version compareTo localGossip.version val (winningGossip, talkback, gossipType) = comparison match { - case VectorClock.Concurrent ⇒ - // conflicting versions, merge - (remoteGossip merge localGossip, true, Merge) case VectorClock.Same ⇒ // same version (remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), Same) @@ -605,6 +603,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with case VectorClock.After ⇒ // remote is newer (remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), Newer) + case _ ⇒ + // conflicting versions, merge + (remoteGossip merge localGossip, true, Merge) } latestGossip = winningGossip seen selfUniqueAddress @@ -657,15 +658,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val preferredGossipTargets: Vector[UniqueAddress] = if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view - // gossip to a random alive member with preference to a member with older or newer gossip version - val localMemberAddressesSet = localGossip.members map { _.uniqueAddress } - val nodesWithDifferentView = for { - (node, version) ← localGossip.overview.seen - if localMemberAddressesSet contains node - if version != localGossip.version - } yield node - - nodesWithDifferentView.toVector + // gossip to a random alive member with preference to a member with older gossip version + localGossip.members.collect { case m if !localGossip.seenByNode(m.uniqueAddress) ⇒ m.uniqueAddress }(breakOut) } else Vector.empty[UniqueAddress] if (preferredGossipTargets.nonEmpty) { @@ -917,8 +911,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def updateLatestGossip(newGossip: Gossip): Unit = { // Updating the vclock version for the changes val versionedGossip = newGossip :+ vclockNode - // Updating the `seen` table - val seenVersionedGossip = versionedGossip seen selfUniqueAddress + // Nobody else have seen this gossip but us + val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress) // Update the state with the new gossip latestGossip = seenVersionedGossip } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index fa51fa1adb..72a86daf64 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -5,7 +5,6 @@ package akka.cluster import scala.collection.immutable -import scala.collection.immutable.TreeMap import MemberStatus._ /** @@ -80,11 +79,10 @@ private[cluster] case class Gossip( format (allowedLiveMemberStatus.mkString(", "), (members filter hasNotAllowedLiveMemberStatus).mkString(", "))) - val seenButNotMember = overview.seen.keySet -- members.map(_.uniqueAddress) -- overview.unreachable.map(_.uniqueAddress) + val seenButNotMember = overview.seen -- members.map(_.uniqueAddress) -- overview.unreachable.map(_.uniqueAddress) if (seenButNotMember.nonEmpty) throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]" format seenButNotMember.mkString(", ")) - } /** @@ -102,53 +100,34 @@ private[cluster] case class Gossip( /** * Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen' - * Map with the VectorClock (version) for the new gossip. */ def seen(node: UniqueAddress): Gossip = { if (seenByNode(node)) this - else this copy (overview = overview copy (seen = overview.seen + (node -> version))) + else this copy (overview = overview copy (seen = overview.seen + node)) } /** - * The nodes that have seen current version of the Gossip. + * Marks the gossip as seen by only this node (address) by replacing the 'gossip.overview.seen' */ - def seenBy: Set[UniqueAddress] = { - overview.seen.collect { - case (node, vclock) if vclock == version ⇒ node - }.toSet + def onlySeen(node: UniqueAddress): Gossip = { + this copy (overview = overview copy (seen = Set(node))) } + /** + * The nodes that have seen the current version of the Gossip. + */ + def seenBy: Set[UniqueAddress] = overview.seen + /** * Has this Gossip been seen by this node. */ - def seenByNode(node: UniqueAddress): Boolean = { - overview.seen.get(node).exists(_ == version) - } - - private def mergeSeenTables(allowed: immutable.SortedSet[Member], - one: TreeMap[UniqueAddress, VectorClock], - another: TreeMap[UniqueAddress, VectorClock]): TreeMap[UniqueAddress, VectorClock] = - (TreeMap.empty[UniqueAddress, VectorClock] /: allowed) { - (merged, member) ⇒ - val node = member.uniqueAddress - (one.get(node), another.get(node)) match { - case (None, None) ⇒ merged - case (Some(v1), None) ⇒ merged.updated(node, v1) - case (None, Some(v2)) ⇒ merged.updated(node, v2) - case (Some(v1), Some(v2)) ⇒ - v1 compareTo v2 match { - case VectorClock.Same | VectorClock.After ⇒ merged.updated(node, v1) - case VectorClock.Before ⇒ merged.updated(node, v2) - case VectorClock.Concurrent ⇒ merged - } - } - } + def seenByNode(node: UniqueAddress): Boolean = overview.seen(node) /** * Merges the seen table of two Gossip instances. */ def mergeSeen(that: Gossip): Gossip = - this copy (overview = overview copy (seen = mergeSeenTables(members, overview.seen, that.overview.seen))) + this copy (overview = overview copy (seen = overview.seen ++ that.overview.seen)) /** * Merges two Gossip instances including membership tables, and the VectorClock histories. @@ -166,8 +145,8 @@ private[cluster] case class Gossip( // and exclude unreachable val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) - // 4. merge seen table - val mergedSeen = mergeSeenTables(mergedMembers, overview.seen, that.overview.seen) + // 4. Nobody can have seen this new gossip yet + val mergedSeen = Set.empty[UniqueAddress] Gossip(mergedMembers, GossipOverview(mergedSeen, mergedUnreachable), mergedVClock) } @@ -237,7 +216,7 @@ private[cluster] case class Gossip( */ @SerialVersionUID(1L) private[cluster] case class GossipOverview( - seen: TreeMap[UniqueAddress, VectorClock] = TreeMap.empty, + seen: Set[UniqueAddress] = Set.empty, unreachable: Set[Member] = Set.empty) { override def toString = diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index a56d61e41c..680a63c9e5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -144,8 +144,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ val addressMapping = allAddresses.zipWithIndex.toMap val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) ⇒ acc ++ m.roles).to[Vector] val roleMapping = allRoles.zipWithIndex.toMap - val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.toSet)( - { case (s, VectorClock(v)) ⇒ s ++ v.keys }).to[Vector] + val allHashes = gossip.version.versions.keys.to[Vector] val hashMapping = allHashes.zipWithIndex.toMap def mapUniqueAddress(uniqueAddress: UniqueAddress) = mapWithErrorMessage(addressMapping, uniqueAddress, "address") @@ -155,14 +154,9 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ msg.Member(mapUniqueAddress(member.uniqueAddress), member.upNumber, msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole)(breakOut)) - def seenToProto(seen: (UniqueAddress, VectorClock)) = { - val (address, version) = seen - msg.GossipOverview.Seen(mapUniqueAddress(address), vectorClockToProto(version, hashMapping)) - } - val unreachable: Vector[msg.Member] = gossip.overview.unreachable.map(memberToProto)(breakOut) val members: Vector[msg.Member] = gossip.members.map(memberToProto)(breakOut) - val seen: Vector[msg.GossipOverview.Seen] = gossip.overview.seen.map(seenToProto)(breakOut) + val seen: Vector[Int] = gossip.overview.seen.map(mapUniqueAddress)(breakOut) val overview = msg.GossipOverview(seen, unreachable) @@ -202,12 +196,9 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ new Member(addressMapping(member.addressIndex), member.upNumber, memberStatusFromInt(member.status.id), member.rolesIndexes.map(roleMapping)(breakOut)) - def seenFromProto(seen: msg.GossipOverview.Seen) = - (addressMapping(seen.addressIndex), vectorClockFromProto(seen.version, hashMapping)) - val members: immutable.SortedSet[Member] = gossip.members.map(memberFromProto)(breakOut) val unreachable: immutable.Set[Member] = gossip.overview.unreachable.map(memberFromProto)(breakOut) - val seen: immutable.TreeMap[UniqueAddress, VectorClock] = gossip.overview.seen.map(seenFromProto)(breakOut) + val seen: Set[UniqueAddress] = gossip.overview.seen.map(addressMapping)(breakOut) val overview = GossipOverview(seen, unreachable) Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping)) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index a95e9c5ca4..fe89f4536e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -105,17 +105,14 @@ class GossipSpec extends WordSpec with MustMatchers { val g3 = (g1 copy (version = g2.version)).seen(d1.uniqueAddress) def checkMerged(merged: Gossip) { - val keys = merged.overview.seen.keys.toSeq - keys.length must be(4) - keys.toSet must be(Set(a1.uniqueAddress, b1.uniqueAddress, c1.uniqueAddress, d1.uniqueAddress)) + val seen = merged.overview.seen.toSeq + seen.length must be(0) - merged seenByNode (a1.uniqueAddress) must be(true) + merged seenByNode (a1.uniqueAddress) must be(false) merged seenByNode (b1.uniqueAddress) must be(false) - merged seenByNode (c1.uniqueAddress) must be(true) - merged seenByNode (d1.uniqueAddress) must be(true) + merged seenByNode (c1.uniqueAddress) must be(false) + merged seenByNode (d1.uniqueAddress) must be(false) merged seenByNode (e1.uniqueAddress) must be(false) - - merged.overview.seen(b1.uniqueAddress) must be(g1.version) } checkMerged(g3 merge g2)