Merge pull request #1535 from akka/wip-3441-speed-up-cluster-gossip-processing-ban

Speed up cluster gossip processing #3441
This commit is contained in:
Björn Antonsson 2013-06-20 03:56:16 -07:00
commit 46966c25ea
7 changed files with 128 additions and 237 deletions

View file

@ -150,7 +150,8 @@ message VectorClock {
required int32 hashIndex = 1; required int32 hashIndex = 1;
required int64 timestamp = 2; required int64 timestamp = 2;
} }
required int64 timestamp = 1; // the timestamp could be removed but left for test data compatibility
optional int64 timestamp = 1;
repeated Version versions = 2; repeated Version versions = 2;
} }

View file

@ -9,7 +9,6 @@ import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal import scala.util.control.NonFatal
import java.util.UUID
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler } import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler }
import akka.actor.OneForOneStrategy import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Stop import akka.actor.SupervisorStrategy.Stop
@ -215,15 +214,17 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
/** /**
* INTERNAL API. * INTERNAL API.
*/ */
private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] { with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._ import InternalClusterAction._
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector } import cluster.{ selfAddress, scheduler, failureDetector }
import cluster.settings._ import cluster.settings._
import cluster.InfoLogger._ import cluster.InfoLogger._
protected def selfUniqueAddress = cluster.selfUniqueAddress
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3 val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3
def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid
@ -548,49 +549,63 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
else if (latestGossip.members.forall(_.uniqueAddress != from)) else if (latestGossip.members.forall(_.uniqueAddress != from))
logInfo("Ignoring received gossip status from unknown [{}]", from) logInfo("Ignoring received gossip status from unknown [{}]", from)
else { else {
(status.version tryCompareTo latestGossip.version) match { (status.version compareTo latestGossip.version) match {
case Some(0) // same version case VectorClock.Same // same version
case Some(x) if x > 0 gossipStatusTo(from, sender) // remote is newer case VectorClock.After gossipStatusTo(from, sender) // remote is newer
case _ gossipTo(from, sender) // conflicting or local is newer case _ gossipTo(from, sender) // conflicting or local is newer
} }
} }
} }
/**
* The types of gossip actions that receive gossip has performed.
*/
sealed trait ReceiveGossipType
case object Ignored extends ReceiveGossipType
case object Older extends ReceiveGossipType
case object Newer extends ReceiveGossipType
case object Same extends ReceiveGossipType
case object Merge extends ReceiveGossipType
/** /**
* Receive new gossip. * Receive new gossip.
*/ */
def receiveGossip(envelope: GossipEnvelope): Unit = { def receiveGossip(envelope: GossipEnvelope): ReceiveGossipType = {
val from = envelope.from val from = envelope.from
val remoteGossip = envelope.gossip val remoteGossip = envelope.gossip
val localGossip = latestGossip val localGossip = latestGossip
if (envelope.to != selfUniqueAddress) if (envelope.to != selfUniqueAddress) {
logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to) logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) Ignored
} else if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) {
logInfo("Ignoring received gossip with myself as unreachable, from [{}]", from.address) logInfo("Ignoring received gossip with myself as unreachable, from [{}]", from.address)
else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from)) Ignored
} else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from)) {
logInfo("Ignoring received gossip from unreachable [{}] ", from) logInfo("Ignoring received gossip from unreachable [{}] ", from)
else if (localGossip.members.forall(_.uniqueAddress != from)) Ignored
} else if (localGossip.members.forall(_.uniqueAddress != from)) {
logInfo("Ignoring received gossip from unknown [{}]", from) logInfo("Ignoring received gossip from unknown [{}]", from)
else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress)) Ignored
} else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress)) {
logInfo("Ignoring received gossip that does not contain myself, from [{}]", from) logInfo("Ignoring received gossip that does not contain myself, from [{}]", from)
else { Ignored
} else {
val comparison = remoteGossip.version compareTo localGossip.version
val comparison = remoteGossip.version tryCompareTo localGossip.version val (winningGossip, talkback, gossipType) = comparison match {
case VectorClock.Concurrent
val (winningGossip, talkback) = comparison match {
case None
// conflicting versions, merge // conflicting versions, merge
(remoteGossip merge localGossip, true) (remoteGossip merge localGossip, true, Merge)
case Some(0) case VectorClock.Same
// same version // same version
(remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress)) (remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), Same)
case Some(x) if x < 0 case VectorClock.Before
// local is newer // local is newer
(localGossip, true) (localGossip, true, Older)
case _ case VectorClock.After
// remote is newer // remote is newer
(remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress)) (remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), Newer)
} }
latestGossip = winningGossip seen selfUniqueAddress latestGossip = winningGossip seen selfUniqueAddress
@ -602,18 +617,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
if (comparison.isEmpty) { if (comparison == VectorClock.Concurrent) {
log.debug( log.debug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip) remoteGossip, localGossip, winningGossip)
} }
if (statsEnabled) { if (statsEnabled) {
gossipStats = comparison match { gossipStats = gossipType match {
case None gossipStats.incrementMergeCount case Merge gossipStats.incrementMergeCount
case Some(0) gossipStats.incrementSameCount case Same gossipStats.incrementSameCount
case Some(x) if x < 0 gossipStats.incrementNewerCount case Newer gossipStats.incrementNewerCount
case _ gossipStats.incrementOlderCount case Older gossipStats.incrementOlderCount
} }
} }
@ -626,6 +641,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// older or sender had newer // older or sender had newer
gossipTo(from, sender) gossipTo(from, sender)
} }
gossipType
} }
} }

View file

@ -5,6 +5,7 @@
package akka.cluster package akka.cluster
import scala.collection.immutable import scala.collection.immutable
import scala.collection.immutable.TreeMap
import MemberStatus._ import MemberStatus._
/** /**
@ -61,8 +62,7 @@ private[cluster] object Gossip {
private[cluster] case class Gossip( private[cluster] case class Gossip(
members: immutable.SortedSet[Member], // sorted set of members with their status, sorted by address members: immutable.SortedSet[Member], // sorted set of members with their status, sorted by address
overview: GossipOverview = GossipOverview(), overview: GossipOverview = GossipOverview(),
version: VectorClock = VectorClock()) // vector clock version version: VectorClock = VectorClock()) { // vector clock version
extends Versioned[Gossip] {
// TODO can be disabled as optimization // TODO can be disabled as optimization
assertInvariants() assertInvariants()
@ -125,8 +125,10 @@ private[cluster] case class Gossip(
overview.seen.get(node).exists(_ == version) overview.seen.get(node).exists(_ == version)
} }
private def mergeSeenTables(allowed: Set[Member], one: Map[UniqueAddress, VectorClock], another: Map[UniqueAddress, VectorClock]): Map[UniqueAddress, VectorClock] = { private def mergeSeenTables(allowed: immutable.SortedSet[Member],
(Map.empty[UniqueAddress, VectorClock] /: allowed) { one: TreeMap[UniqueAddress, VectorClock],
another: TreeMap[UniqueAddress, VectorClock]): TreeMap[UniqueAddress, VectorClock] =
(TreeMap.empty[UniqueAddress, VectorClock] /: allowed) {
(merged, member) (merged, member)
val node = member.uniqueAddress val node = member.uniqueAddress
(one.get(node), another.get(node)) match { (one.get(node), another.get(node)) match {
@ -134,14 +136,13 @@ private[cluster] case class Gossip(
case (Some(v1), None) merged.updated(node, v1) case (Some(v1), None) merged.updated(node, v1)
case (None, Some(v2)) merged.updated(node, v2) case (None, Some(v2)) merged.updated(node, v2)
case (Some(v1), Some(v2)) case (Some(v1), Some(v2))
v1 tryCompareTo v2 match { v1 compareTo v2 match {
case None merged case VectorClock.Same | VectorClock.After merged.updated(node, v1)
case Some(x) if x > 0 merged.updated(node, v1) case VectorClock.Before merged.updated(node, v2)
case _ merged.updated(node, v2) case VectorClock.Concurrent merged
} }
} }
} }
}
/** /**
* Merges the seen table of two Gossip instances. * Merges the seen table of two Gossip instances.
@ -236,7 +237,7 @@ private[cluster] case class Gossip(
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
private[cluster] case class GossipOverview( private[cluster] case class GossipOverview(
seen: Map[UniqueAddress, VectorClock] = Map.empty, seen: TreeMap[UniqueAddress, VectorClock] = TreeMap.empty,
unreachable: Set[Member] = Set.empty) { unreachable: Set[Member] = Set.empty) {
override def toString = override def toString =

View file

@ -118,13 +118,7 @@ object Member {
*/ */
implicit val ordering: Ordering[Member] = new Ordering[Member] { implicit val ordering: Ordering[Member] = new Ordering[Member] {
def compare(a: Member, b: Member): Int = { def compare(a: Member, b: Member): Int = {
val result = addressOrdering.compare(a.address, b.address) a.uniqueAddress compare b.uniqueAddress
if (result == 0) {
val aUid = a.uniqueAddress.uid
val bUid = b.uniqueAddress.uid
if (aUid < bUid) -1 else if (aUid == bUid) 0 else 1
} else
result
} }
} }
@ -218,4 +212,13 @@ object MemberStatus {
* INTERNAL API * INTERNAL API
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
private[cluster] case class UniqueAddress(address: Address, uid: Int) private[cluster] case class UniqueAddress(address: Address, uid: Int) extends Ordered[UniqueAddress] {
@transient
override lazy val hashCode = scala.util.hashing.MurmurHash3.productHash(this)
override def compare(that: UniqueAddress): Int = {
val result = Member.addressOrdering.compare(this.address, that.address)
if (result == 0) if (this.uid < that.uid) -1 else if (this.uid == that.uid) 0 else 1
else result
}
}

View file

@ -5,87 +5,29 @@
package akka.cluster package akka.cluster
import akka.AkkaException import akka.AkkaException
import akka.event.Logging
import akka.actor.ActorSystem
import System.{ currentTimeMillis newTimestamp } import System.{ currentTimeMillis newTimestamp }
import java.security.MessageDigest import java.security.MessageDigest
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import scala.collection.immutable.TreeMap
class VectorClockException(message: String) extends AkkaException(message)
/**
* Trait to be extended by classes that wants to be versioned using a VectorClock.
*/
trait Versioned[T] {
def version: VectorClock
def :+(node: VectorClock.Node): T
}
/**
* Utility methods for comparing Versioned instances.
*/
object Versioned {
/**
* The result of comparing two Versioned objects.
* Either:
* {{{
* 1) v1 is BEFORE v2 => Before
* 2) v1 is AFTER t2 => After
* 3) v1 happens CONCURRENTLY to v2 => Concurrent
* }}}
*/
sealed trait Ordering
case object Before extends Ordering
case object After extends Ordering
case object Concurrent extends Ordering
/**
* Returns or 'Ordering' for the two 'Versioned' instances.
*/
def compare[T <: Versioned[T]](versioned1: Versioned[T], versioned2: Versioned[T]): Ordering = {
if (versioned1.version <> versioned2.version) Concurrent
else if (versioned1.version < versioned2.version) Before
else After
}
/**
* Returns the Versioned that have the latest version.
*/
def latestVersionOf[T <: Versioned[T]](versioned1: T, versioned2: T): T = {
compare(versioned1, versioned2) match {
case Concurrent versioned2
case Before versioned2
case After versioned1
}
}
}
/** /**
* VectorClock module with helper classes and methods. * VectorClock module with helper classes and methods.
* *
* Based on code from the 'vlock' VectorClock library by Coda Hale. * Based on code from the 'vlock' VectorClock library by Coda Hale.
*/ */
object VectorClock { private[cluster] object VectorClock {
/** /**
* Hash representation of a versioned node name. * Hash representation of a versioned node name.
*/ */
sealed trait Node extends Serializable { type Node = String
def hash: String
}
object Node { object Node {
@SerialVersionUID(1L)
private case class NodeImpl(name: String) extends Node {
override def toString(): String = "Node(" + name + ")"
override def hash: String = name
}
def apply(name: String): Node = NodeImpl(hash(name)) def apply(name: String): Node = hash(name)
def fromHash(hash: String): Node = NodeImpl(hash) def fromHash(hash: String): Node = hash
private def hash(name: String): String = { private def hash(name: String): String = {
val digester = MessageDigest.getInstance("MD5") val digester = MessageDigest.getInstance("MD5")
@ -98,11 +40,8 @@ object VectorClock {
* Timestamp representation a unique 'Ordered' timestamp. * Timestamp representation a unique 'Ordered' timestamp.
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
case class Timestamp(time: Long) extends Ordered[Timestamp] { final case class Timestamp(time: Long) extends Ordered[Timestamp] {
def max(other: Timestamp) = { def max(other: Timestamp) = if (this < other) other else this
if (this < other) other
else this
}
def compare(other: Timestamp) = time compare other.time def compare(other: Timestamp) = time compare other.time
@ -127,6 +66,12 @@ object VectorClock {
new Timestamp(newTime) new Timestamp(newTime)
} }
} }
sealed trait Ordering
case object After extends Ordering
case object Before extends Ordering
case object Same extends Ordering
case object Concurrent extends Ordering
} }
/** /**
@ -141,9 +86,7 @@ object VectorClock {
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
case class VectorClock( case class VectorClock(
timestamp: VectorClock.Timestamp = VectorClock.Timestamp(), versions: TreeMap[VectorClock.Node, VectorClock.Timestamp] = TreeMap.empty[VectorClock.Node, VectorClock.Timestamp]) {
versions: Map[VectorClock.Node, VectorClock.Timestamp] = Map.empty[VectorClock.Node, VectorClock.Timestamp])
extends PartiallyOrdered[VectorClock] {
import VectorClock._ import VectorClock._
@ -155,7 +98,17 @@ case class VectorClock(
/** /**
* Returns true if <code>this</code> and <code>that</code> are concurrent else false. * Returns true if <code>this</code> and <code>that</code> are concurrent else false.
*/ */
def <>(that: VectorClock): Boolean = tryCompareTo(that) == None def <>(that: VectorClock): Boolean = compareTo(that) == Concurrent
/**
* Returns true if <code>this</code> is before <code>that</code> else false.
*/
def <(that: VectorClock): Boolean = compareTo(that) == Before
/**
* Returns true if <code>this</code> is after <code>that</code> else false.
*/
def >(that: VectorClock): Boolean = compareTo(that) == After
/** /**
* Returns true if this VectorClock has the same history as the 'that' VectorClock else false. * Returns true if this VectorClock has the same history as the 'that' VectorClock else false.
@ -163,39 +116,37 @@ case class VectorClock(
def ==(that: VectorClock): Boolean = versions == that.versions def ==(that: VectorClock): Boolean = versions == that.versions
/** /**
* For the 'PartiallyOrdered' trait, to allow natural comparisons using <, > and ==. * Compare two vector clocks. The outcome will be one of the following:
* <p/>
* Compare two vector clocks. The outcomes will be one of the following:
* <p/> * <p/>
* {{{ * {{{
* 1. Clock 1 is BEFORE (>) Clock 2 if there exists an i such that c1(i) <= c2(i) and there does not exist a j such that c1(j) > c2(j). * 1. Clock 1 is SAME (==) as Clock 2 iff for all i c1(i) == c2(i)
* 2. Clock 1 is CONCURRENT (<>) to Clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j). * 2. Clock 1 is BEFORE (<) Clock 2 iff for all i c1(i) <= c2(i) and there exist a j such that c1(j) < c2(j)
* 3. Clock 1 is AFTER (<) Clock 2 otherwise. * 3. Clock 1 is AFTER (>) Clock 2 iff for all i c1(i) >= c2(i) and there exist a j such that c1(j) > c2(j).
* 4. Clock 1 is CONCURRENT (<>) to Clock 2 otherwise.
* }}} * }}}
*/ */
def tryCompareTo[V >: VectorClock <% PartiallyOrdered[V]](vclock: V): Option[Int] = { def compareTo(that: VectorClock): Ordering = {
def compare(versions1: Map[Node, Timestamp], versions2: Map[Node, Timestamp]): Boolean = { def olderOrSameIn(version: Pair[Node, Timestamp], versions: Map[Node, Timestamp]): Boolean = {
versions1.forall { case ((n, t)) t <= versions2.getOrElse(n, Timestamp.zero) } && version match { case (n, t) t <= versions(n) }
(versions1.exists { case ((n, t)) t < versions2.getOrElse(n, Timestamp.zero) } ||
(versions1.size < versions2.size))
}
vclock match {
case VectorClock(_, otherVersions)
if (compare(versions, otherVersions)) Some(-1)
else if (compare(otherVersions, versions)) Some(1)
else if (versions == otherVersions) Some(0)
else None
case _ None
} }
if (versions == that.versions) Same
else if (versions.forall(olderOrSameIn(_, that.versions.withDefaultValue(Timestamp.zero)))) Before
else if (that.versions.forall(olderOrSameIn(_, versions.withDefaultValue(Timestamp.zero)))) After
else Concurrent
} }
/** /**
* Merges this VectorClock with another VectorClock. E.g. merges its versioned history. * Merges this VectorClock with another VectorClock. E.g. merges its versioned history.
*/ */
def merge(that: VectorClock): VectorClock = { def merge(that: VectorClock): VectorClock = {
val mergedVersions = scala.collection.mutable.Map.empty[Node, Timestamp] ++ that.versions var mergedVersions = that.versions
for ((node, time) versions) mergedVersions(node) = time max mergedVersions.getOrElse(node, time) for ((node, time) versions) {
VectorClock(timestamp, Map.empty[Node, Timestamp] ++ mergedVersions) val mergedVersionsCurrentTime = mergedVersions.getOrElse(node, Timestamp.zero)
if (time > mergedVersionsCurrentTime)
mergedVersions = mergedVersions.updated(node, time)
}
VectorClock(mergedVersions)
} }
override def toString = versions.map { case ((n, t)) n + " -> " + t }.mkString("VectorClock(", ", ", ")") override def toString = versions.map { case ((n, t)) n + " -> " + t }.mkString("VectorClock(", ", ", ")")

View file

@ -6,6 +6,7 @@ package akka.cluster.protobuf
import akka.serialization.Serializer import akka.serialization.Serializer
import akka.cluster._ import akka.cluster._
import scala.collection.breakOut import scala.collection.breakOut
import scala.collection.immutable.TreeMap
import akka.actor.{ ExtendedActorSystem, Address } import akka.actor.{ ExtendedActorSystem, Address }
import scala.Some import scala.Some
import scala.collection.immutable import scala.collection.immutable
@ -60,7 +61,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
case m: MetricsGossipEnvelope case m: MetricsGossipEnvelope
compress(metricsGossipEnvelopeToProto(m)) compress(metricsGossipEnvelopeToProto(m))
case InternalClusterAction.Join(node, roles) case InternalClusterAction.Join(node, roles)
msg.Join(uniqueAddressToProto(node), roles.map(identity)(breakOut): Vector[String]).toByteArray msg.Join(uniqueAddressToProto(node), roles.toVector).toByteArray
case InternalClusterAction.Welcome(from, gossip) case InternalClusterAction.Welcome(from, gossip)
compress(msg.Welcome(uniqueAddressToProto(from), gossipToProto(gossip))) compress(msg.Welcome(uniqueAddressToProto(from), gossipToProto(gossip)))
case ClusterUserAction.Leave(address) case ClusterUserAction.Leave(address)
@ -162,8 +163,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
val addressMapping = allAddresses.zipWithIndex.toMap val addressMapping = allAddresses.zipWithIndex.toMap
val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) acc ++ m.roles).to[Vector] val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) acc ++ m.roles).to[Vector]
val roleMapping = allRoles.zipWithIndex.toMap val roleMapping = allRoles.zipWithIndex.toMap
val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.map(_.hash).toSet) { val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.toSet) {
case (s, VectorClock(t, v)) s ++ v.keys.map(_.hash) case (s, VectorClock(v)) s ++ v.keys
}.to[Vector] }.to[Vector]
val hashMapping = allHashes.zipWithIndex.toMap val hashMapping = allHashes.zipWithIndex.toMap
@ -192,8 +193,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): msg.VectorClock = { private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): msg.VectorClock = {
def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash") def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash")
msg.VectorClock(version.timestamp.time, msg.VectorClock(None,
version.versions.map { case (n, t) msg.VectorClock.Version(mapHash(n.hash), t.time) }.to[Vector]) version.versions.map { case (n, t) msg.VectorClock.Version(mapHash(n), t.time) }.to[Vector])
} }
private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = { private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = {
@ -202,7 +203,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
} }
private def gossipStatusToProto(status: GossipStatus): msg.GossipStatus = { private def gossipStatusToProto(status: GossipStatus): msg.GossipStatus = {
val allHashes: Vector[String] = status.version.versions.keys.map(_.hash)(collection.breakOut) val allHashes: Vector[String] = status.version.versions.keys.toVector
val hashMapping = allHashes.zipWithIndex.toMap val hashMapping = allHashes.zipWithIndex.toMap
msg.GossipStatus(uniqueAddressToProto(status.from), allHashes, vectorClockToProto(status.version, hashMapping)) msg.GossipStatus(uniqueAddressToProto(status.from), allHashes, vectorClockToProto(status.version, hashMapping))
} }
@ -230,18 +231,17 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ
val members = gossip.members.map(memberFromProto).to[immutable.SortedSet] val members = gossip.members.map(memberFromProto).to[immutable.SortedSet]
val unreachable = gossip.overview.unreachable.map(memberFromProto).toSet val unreachable = gossip.overview.unreachable.map(memberFromProto).toSet
val seen = gossip.overview.seen.map(seenFromProto).toMap val seen = gossip.overview.seen.map(seenFromProto)(breakOut): TreeMap[UniqueAddress, VectorClock]
val overview = GossipOverview(seen, unreachable) val overview = GossipOverview(seen, unreachable)
Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping)) Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping))
} }
private def vectorClockFromProto(version: msg.VectorClock, hashMapping: immutable.Seq[String]) = { private def vectorClockFromProto(version: msg.VectorClock, hashMapping: immutable.Seq[String]) = {
VectorClock(VectorClock.Timestamp(version.timestamp), VectorClock(version.versions.map({
version.versions.map { case msg.VectorClock.Version(h, t)
case msg.VectorClock.Version(h, t) (VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t))
(VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t)) })(breakOut): TreeMap[VectorClock.Node, VectorClock.Timestamp])
}.toMap)
} }
private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = { private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = {

View file

@ -199,7 +199,6 @@ class VectorClockSpec extends AkkaSpec {
"pass blank clock incrementing" in { "pass blank clock incrementing" in {
val node1 = Node("1") val node1 = Node("1")
val node2 = Node("2") val node2 = Node("2")
val node3 = Node("3")
val v1 = VectorClock() val v1 = VectorClock()
val v2 = VectorClock() val v2 = VectorClock()
@ -236,84 +235,4 @@ class VectorClockSpec extends AkkaSpec {
(c1 > b1) must equal(true) (c1 > b1) must equal(true)
} }
} }
"An instance of Versioned" must {
class TestVersioned(val version: VectorClock = VectorClock()) extends Versioned[TestVersioned] {
def :+(node: Node): TestVersioned = new TestVersioned(version :+ node)
}
import Versioned.latestVersionOf
"have zero versions when created" in {
val versioned = new TestVersioned()
versioned.version.versions must be(Map())
}
"happen before an identical versioned with a single additional event" in {
val versioned1_1 = new TestVersioned()
val versioned2_1 = versioned1_1 :+ Node("1")
val versioned3_1 = versioned2_1 :+ Node("2")
val versioned4_1 = versioned3_1 :+ Node("1")
val versioned1_2 = new TestVersioned()
val versioned2_2 = versioned1_2 :+ Node("1")
val versioned3_2 = versioned2_2 :+ Node("2")
val versioned4_2 = versioned3_2 :+ Node("1")
val versioned5_2 = versioned4_2 :+ Node("3")
latestVersionOf[TestVersioned](versioned4_1, versioned5_2) must be(versioned5_2)
}
"pass misc comparison test 1" in {
var versioned1_1 = new TestVersioned()
val versioned2_1 = versioned1_1 :+ Node("1")
val versioned1_2 = new TestVersioned()
val versioned2_2 = versioned1_2 :+ Node("2")
latestVersionOf[TestVersioned](versioned2_1, versioned2_2) must be(versioned2_2)
}
"pass misc comparison test 2" in {
val versioned1_3 = new TestVersioned()
val versioned2_3 = versioned1_3 :+ Node("1")
val versioned3_3 = versioned2_3 :+ Node("2")
val versioned4_3 = versioned3_3 :+ Node("1")
val versioned1_4 = new TestVersioned()
val versioned2_4 = versioned1_4 :+ Node("1")
val versioned3_4 = versioned2_4 :+ Node("1")
val versioned4_4 = versioned3_4 :+ Node("3")
latestVersionOf[TestVersioned](versioned4_3, versioned4_4) must be(versioned4_4)
}
"pass misc comparison test 3" in {
val versioned1_1 = new TestVersioned()
val versioned2_1 = versioned1_1 :+ Node("2")
val versioned3_1 = versioned2_1 :+ Node("2")
val versioned1_2 = new TestVersioned()
val versioned2_2 = versioned1_2 :+ Node("1")
val versioned3_2 = versioned2_2 :+ Node("2")
val versioned4_2 = versioned3_2 :+ Node("2")
val versioned5_2 = versioned4_2 :+ Node("3")
latestVersionOf[TestVersioned](versioned3_1, versioned5_2) must be(versioned5_2)
}
"pass misc comparison test 4" in {
val versioned1_1 = new TestVersioned()
val versioned2_1 = versioned1_1 :+ Node("1")
val versioned3_1 = versioned2_1 :+ Node("2")
val versioned4_1 = versioned3_1 :+ Node("2")
val versioned5_1 = versioned4_1 :+ Node("3")
val versioned1_2 = new TestVersioned()
val versioned2_2 = versioned1_2 :+ Node("2")
val versioned3_2 = versioned2_2 :+ Node("2")
latestVersionOf[TestVersioned](versioned5_1, versioned3_2) must be(versioned3_2)
}
}
} }