!clu #2320 Convert the seen table into something more efficient

This commit is contained in:
Björn Antonsson 2013-09-02 13:50:46 +02:00
parent c6345d2770
commit bbad92c749
5 changed files with 33 additions and 75 deletions

View file

@ -112,11 +112,8 @@ message Gossip {
* Gossip Overview
*/
message GossipOverview {
message Seen {
required int32 addressIndex = 1;
required VectorClock version = 2;
}
repeated Seen seen = 1;
/* This is the address indexes for the nodes that have seen this gossip */
repeated int32 seen = 1;
repeated Member unreachable = 2;
}

View file

@ -14,6 +14,7 @@ import akka.actor.SupervisorStrategy.Stop
import akka.cluster.MemberStatus._
import akka.cluster.ClusterEvent._
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import scala.collection.breakOut
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -593,9 +594,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val comparison = remoteGossip.version compareTo localGossip.version
val (winningGossip, talkback, gossipType) = comparison match {
case VectorClock.Concurrent
// conflicting versions, merge
(remoteGossip merge localGossip, true, Merge)
case VectorClock.Same
// same version
(remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), Same)
@ -605,6 +603,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
case VectorClock.After
// remote is newer
(remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), Newer)
case _
// conflicting versions, merge
(remoteGossip merge localGossip, true, Merge)
}
latestGossip = winningGossip seen selfUniqueAddress
@ -657,15 +658,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val preferredGossipTargets: Vector[UniqueAddress] =
if (ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) { // If it's time to try to gossip to some nodes with a different view
// gossip to a random alive member with preference to a member with older or newer gossip version
val localMemberAddressesSet = localGossip.members map { _.uniqueAddress }
val nodesWithDifferentView = for {
(node, version) localGossip.overview.seen
if localMemberAddressesSet contains node
if version != localGossip.version
} yield node
nodesWithDifferentView.toVector
// gossip to a random alive member with preference to a member with older gossip version
localGossip.members.collect { case m if !localGossip.seenByNode(m.uniqueAddress) m.uniqueAddress }(breakOut)
} else Vector.empty[UniqueAddress]
if (preferredGossipTargets.nonEmpty) {
@ -917,8 +911,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def updateLatestGossip(newGossip: Gossip): Unit = {
// Updating the vclock version for the changes
val versionedGossip = newGossip :+ vclockNode
// Updating the `seen` table
val seenVersionedGossip = versionedGossip seen selfUniqueAddress
// Nobody else have seen this gossip but us
val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress)
// Update the state with the new gossip
latestGossip = seenVersionedGossip
}

View file

@ -5,7 +5,6 @@
package akka.cluster
import scala.collection.immutable
import scala.collection.immutable.TreeMap
import MemberStatus._
/**
@ -80,11 +79,10 @@ private[cluster] case class Gossip(
format (allowedLiveMemberStatus.mkString(", "),
(members filter hasNotAllowedLiveMemberStatus).mkString(", ")))
val seenButNotMember = overview.seen.keySet -- members.map(_.uniqueAddress) -- overview.unreachable.map(_.uniqueAddress)
val seenButNotMember = overview.seen -- members.map(_.uniqueAddress) -- overview.unreachable.map(_.uniqueAddress)
if (seenButNotMember.nonEmpty)
throw new IllegalArgumentException("Nodes not part of cluster have marked the Gossip as seen, got [%s]"
format seenButNotMember.mkString(", "))
}
/**
@ -102,53 +100,34 @@ private[cluster] case class Gossip(
/**
* Marks the gossip as seen by this node (address) by updating the address entry in the 'gossip.overview.seen'
* Map with the VectorClock (version) for the new gossip.
*/
def seen(node: UniqueAddress): Gossip = {
if (seenByNode(node)) this
else this copy (overview = overview copy (seen = overview.seen + (node -> version)))
else this copy (overview = overview copy (seen = overview.seen + node))
}
/**
* The nodes that have seen current version of the Gossip.
* Marks the gossip as seen by only this node (address) by replacing the 'gossip.overview.seen'
*/
def seenBy: Set[UniqueAddress] = {
overview.seen.collect {
case (node, vclock) if vclock == version node
}.toSet
def onlySeen(node: UniqueAddress): Gossip = {
this copy (overview = overview copy (seen = Set(node)))
}
/**
* The nodes that have seen the current version of the Gossip.
*/
def seenBy: Set[UniqueAddress] = overview.seen
/**
* Has this Gossip been seen by this node.
*/
def seenByNode(node: UniqueAddress): Boolean = {
overview.seen.get(node).exists(_ == version)
}
private def mergeSeenTables(allowed: immutable.SortedSet[Member],
one: TreeMap[UniqueAddress, VectorClock],
another: TreeMap[UniqueAddress, VectorClock]): TreeMap[UniqueAddress, VectorClock] =
(TreeMap.empty[UniqueAddress, VectorClock] /: allowed) {
(merged, member)
val node = member.uniqueAddress
(one.get(node), another.get(node)) match {
case (None, None) merged
case (Some(v1), None) merged.updated(node, v1)
case (None, Some(v2)) merged.updated(node, v2)
case (Some(v1), Some(v2))
v1 compareTo v2 match {
case VectorClock.Same | VectorClock.After merged.updated(node, v1)
case VectorClock.Before merged.updated(node, v2)
case VectorClock.Concurrent merged
}
}
}
def seenByNode(node: UniqueAddress): Boolean = overview.seen(node)
/**
* Merges the seen table of two Gossip instances.
*/
def mergeSeen(that: Gossip): Gossip =
this copy (overview = overview copy (seen = mergeSeenTables(members, overview.seen, that.overview.seen)))
this copy (overview = overview copy (seen = overview.seen ++ that.overview.seen))
/**
* Merges two Gossip instances including membership tables, and the VectorClock histories.
@ -166,8 +145,8 @@ private[cluster] case class Gossip(
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
// 4. merge seen table
val mergedSeen = mergeSeenTables(mergedMembers, overview.seen, that.overview.seen)
// 4. Nobody can have seen this new gossip yet
val mergedSeen = Set.empty[UniqueAddress]
Gossip(mergedMembers, GossipOverview(mergedSeen, mergedUnreachable), mergedVClock)
}
@ -237,7 +216,7 @@ private[cluster] case class Gossip(
*/
@SerialVersionUID(1L)
private[cluster] case class GossipOverview(
seen: TreeMap[UniqueAddress, VectorClock] = TreeMap.empty,
seen: Set[UniqueAddress] = Set.empty,
unreachable: Set[Member] = Set.empty) {
override def toString =

View file

@ -144,8 +144,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
val addressMapping = allAddresses.zipWithIndex.toMap
val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) acc ++ m.roles).to[Vector]
val roleMapping = allRoles.zipWithIndex.toMap
val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.toSet)(
{ case (s, VectorClock(v)) s ++ v.keys }).to[Vector]
val allHashes = gossip.version.versions.keys.to[Vector]
val hashMapping = allHashes.zipWithIndex.toMap
def mapUniqueAddress(uniqueAddress: UniqueAddress) = mapWithErrorMessage(addressMapping, uniqueAddress, "address")
@ -155,14 +154,9 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
msg.Member(mapUniqueAddress(member.uniqueAddress), member.upNumber,
msg.MemberStatus.valueOf(memberStatusToInt(member.status)), member.roles.map(mapRole)(breakOut))
def seenToProto(seen: (UniqueAddress, VectorClock)) = {
val (address, version) = seen
msg.GossipOverview.Seen(mapUniqueAddress(address), vectorClockToProto(version, hashMapping))
}
val unreachable: Vector[msg.Member] = gossip.overview.unreachable.map(memberToProto)(breakOut)
val members: Vector[msg.Member] = gossip.members.map(memberToProto)(breakOut)
val seen: Vector[msg.GossipOverview.Seen] = gossip.overview.seen.map(seenToProto)(breakOut)
val seen: Vector[Int] = gossip.overview.seen.map(mapUniqueAddress)(breakOut)
val overview = msg.GossipOverview(seen, unreachable)
@ -202,12 +196,9 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
new Member(addressMapping(member.addressIndex), member.upNumber, memberStatusFromInt(member.status.id),
member.rolesIndexes.map(roleMapping)(breakOut))
def seenFromProto(seen: msg.GossipOverview.Seen) =
(addressMapping(seen.addressIndex), vectorClockFromProto(seen.version, hashMapping))
val members: immutable.SortedSet[Member] = gossip.members.map(memberFromProto)(breakOut)
val unreachable: immutable.Set[Member] = gossip.overview.unreachable.map(memberFromProto)(breakOut)
val seen: immutable.TreeMap[UniqueAddress, VectorClock] = gossip.overview.seen.map(seenFromProto)(breakOut)
val seen: Set[UniqueAddress] = gossip.overview.seen.map(addressMapping)(breakOut)
val overview = GossipOverview(seen, unreachable)
Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping))

View file

@ -105,17 +105,14 @@ class GossipSpec extends WordSpec with MustMatchers {
val g3 = (g1 copy (version = g2.version)).seen(d1.uniqueAddress)
def checkMerged(merged: Gossip) {
val keys = merged.overview.seen.keys.toSeq
keys.length must be(4)
keys.toSet must be(Set(a1.uniqueAddress, b1.uniqueAddress, c1.uniqueAddress, d1.uniqueAddress))
val seen = merged.overview.seen.toSeq
seen.length must be(0)
merged seenByNode (a1.uniqueAddress) must be(true)
merged seenByNode (a1.uniqueAddress) must be(false)
merged seenByNode (b1.uniqueAddress) must be(false)
merged seenByNode (c1.uniqueAddress) must be(true)
merged seenByNode (d1.uniqueAddress) must be(true)
merged seenByNode (c1.uniqueAddress) must be(false)
merged seenByNode (d1.uniqueAddress) must be(false)
merged seenByNode (e1.uniqueAddress) must be(false)
merged.overview.seen(b1.uniqueAddress) must be(g1.version)
}
checkMerged(g3 merge g2)