From 1adfcb84546e63d1a4df2f63cac7edf155f643f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bjo=CC=88rn=20Antonsson?= Date: Thu, 13 Jun 2013 15:43:37 -0400 Subject: [PATCH] Speed up cluster gossip processing. See #3441 Check VectorClock for common case first and cache hashCodes. See #3441 Make ClusterDaemon a bit more testable. See #3441 Changing VectorClock and GossipOverview to TreeMaps. See #3441 Make VectorClock private[cluster] and remove unused code. See #3441 --- .../src/main/protobuf/ClusterMessages.proto | 3 +- .../scala/akka/cluster/ClusterDaemon.scala | 78 ++++++---- .../src/main/scala/akka/cluster/Gossip.scala | 21 +-- .../src/main/scala/akka/cluster/Member.scala | 19 ++- .../main/scala/akka/cluster/VectorClock.scala | 139 ++++++------------ .../protobuf/ClusterMessageSerializer.scala | 24 +-- .../scala/akka/cluster/VectorClockSpec.scala | 81 ---------- 7 files changed, 128 insertions(+), 237 deletions(-) diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index b0a1ffe6e9..a2575b0a76 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -150,7 +150,8 @@ message VectorClock { required int32 hashIndex = 1; required int64 timestamp = 2; } - required int64 timestamp = 1; + // the timestamp could be removed but left for test data compatibility + optional int64 timestamp = 1; repeated Version versions = 2; } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 99d261f012..a8233110bd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -9,7 +9,6 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.control.NonFatal -import java.util.UUID import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler } import akka.actor.OneForOneStrategy import akka.actor.SupervisorStrategy.Stop @@ -215,15 +214,17 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi /** * INTERNAL API. */ -private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging +private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ val cluster = Cluster(context.system) - import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector } + import cluster.{ selfAddress, scheduler, failureDetector } import cluster.settings._ import cluster.InfoLogger._ + protected def selfUniqueAddress = cluster.selfUniqueAddress + val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3 def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid @@ -549,49 +550,63 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto else if (latestGossip.members.forall(_.uniqueAddress != from)) logInfo("Ignoring received gossip status from unknown [{}]", from) else { - (status.version tryCompareTo latestGossip.version) match { - case Some(0) ⇒ // same version - case Some(x) if x > 0 ⇒ gossipStatusTo(from, sender) // remote is newer - case _ ⇒ gossipTo(from, sender) // conflicting or local is newer + (status.version compareTo latestGossip.version) match { + case VectorClock.Same ⇒ // same version + case VectorClock.After ⇒ gossipStatusTo(from, sender) // remote is newer + case _ ⇒ gossipTo(from, sender) // conflicting or local is newer } } } + /** + * The types of gossip actions that receive gossip has performed. + */ + sealed trait ReceiveGossipType + case object Ignored extends ReceiveGossipType + case object Older extends ReceiveGossipType + case object Newer extends ReceiveGossipType + case object Same extends ReceiveGossipType + case object Merge extends ReceiveGossipType + /** * Receive new gossip. */ - def receiveGossip(envelope: GossipEnvelope): Unit = { + def receiveGossip(envelope: GossipEnvelope): ReceiveGossipType = { val from = envelope.from val remoteGossip = envelope.gossip val localGossip = latestGossip - if (envelope.to != selfUniqueAddress) + if (envelope.to != selfUniqueAddress) { logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to) - if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) + Ignored + } else if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) { logInfo("Ignoring received gossip with myself as unreachable, from [{}]", from.address) - else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from)) + Ignored + } else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from)) { logInfo("Ignoring received gossip from unreachable [{}] ", from) - else if (localGossip.members.forall(_.uniqueAddress != from)) + Ignored + } else if (localGossip.members.forall(_.uniqueAddress != from)) { logInfo("Ignoring received gossip from unknown [{}]", from) - else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress)) + Ignored + } else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress)) { logInfo("Ignoring received gossip that does not contain myself, from [{}]", from) - else { + Ignored + } else { + val comparison = remoteGossip.version compareTo localGossip.version - val comparison = remoteGossip.version tryCompareTo localGossip.version - - val (winningGossip, talkback) = comparison match { - case None ⇒ + val (winningGossip, talkback, gossipType) = comparison match { + case VectorClock.Concurrent ⇒ // conflicting versions, merge - (remoteGossip merge localGossip, true) - case Some(0) ⇒ + (remoteGossip merge localGossip, true, Merge) + case VectorClock.Same ⇒ // same version - (remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress)) - case Some(x) if x < 0 ⇒ + (remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), Same) + case VectorClock.Before ⇒ // local is newer - (localGossip, true) - case _ ⇒ + (localGossip, true, Older) + case VectorClock.After ⇒ // remote is newer - (remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress)) + (remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), Newer) } latestGossip = winningGossip seen selfUniqueAddress @@ -603,18 +618,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) - if (comparison.isEmpty) { + if (comparison == VectorClock.Concurrent) { log.debug( """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", remoteGossip, localGossip, winningGossip) } if (statsEnabled) { - gossipStats = comparison match { - case None ⇒ gossipStats.incrementMergeCount - case Some(0) ⇒ gossipStats.incrementSameCount - case Some(x) if x < 0 ⇒ gossipStats.incrementNewerCount - case _ ⇒ gossipStats.incrementOlderCount + gossipStats = gossipType match { + case Merge ⇒ gossipStats.incrementMergeCount + case Same ⇒ gossipStats.incrementSameCount + case Newer ⇒ gossipStats.incrementNewerCount + case Older ⇒ gossipStats.incrementOlderCount } vclockStats = VectorClockStats( versionSize = latestGossip.version.versions.size, @@ -630,6 +645,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto // older or sender had newer gossipTo(from, sender) } + gossipType } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 4bae2006bc..fa51fa1adb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -5,6 +5,7 @@ package akka.cluster import scala.collection.immutable +import scala.collection.immutable.TreeMap import MemberStatus._ /** @@ -61,8 +62,7 @@ private[cluster] object Gossip { private[cluster] case class Gossip( members: immutable.SortedSet[Member], // sorted set of members with their status, sorted by address overview: GossipOverview = GossipOverview(), - version: VectorClock = VectorClock()) // vector clock version - extends Versioned[Gossip] { + version: VectorClock = VectorClock()) { // vector clock version // TODO can be disabled as optimization assertInvariants() @@ -125,8 +125,10 @@ private[cluster] case class Gossip( overview.seen.get(node).exists(_ == version) } - private def mergeSeenTables(allowed: Set[Member], one: Map[UniqueAddress, VectorClock], another: Map[UniqueAddress, VectorClock]): Map[UniqueAddress, VectorClock] = { - (Map.empty[UniqueAddress, VectorClock] /: allowed) { + private def mergeSeenTables(allowed: immutable.SortedSet[Member], + one: TreeMap[UniqueAddress, VectorClock], + another: TreeMap[UniqueAddress, VectorClock]): TreeMap[UniqueAddress, VectorClock] = + (TreeMap.empty[UniqueAddress, VectorClock] /: allowed) { (merged, member) ⇒ val node = member.uniqueAddress (one.get(node), another.get(node)) match { @@ -134,14 +136,13 @@ private[cluster] case class Gossip( case (Some(v1), None) ⇒ merged.updated(node, v1) case (None, Some(v2)) ⇒ merged.updated(node, v2) case (Some(v1), Some(v2)) ⇒ - v1 tryCompareTo v2 match { - case None ⇒ merged - case Some(x) if x > 0 ⇒ merged.updated(node, v1) - case _ ⇒ merged.updated(node, v2) + v1 compareTo v2 match { + case VectorClock.Same | VectorClock.After ⇒ merged.updated(node, v1) + case VectorClock.Before ⇒ merged.updated(node, v2) + case VectorClock.Concurrent ⇒ merged } } } - } /** * Merges the seen table of two Gossip instances. @@ -236,7 +237,7 @@ private[cluster] case class Gossip( */ @SerialVersionUID(1L) private[cluster] case class GossipOverview( - seen: Map[UniqueAddress, VectorClock] = Map.empty, + seen: TreeMap[UniqueAddress, VectorClock] = TreeMap.empty, unreachable: Set[Member] = Set.empty) { override def toString = diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 9d0ded1a34..2fa1cfdecf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -118,13 +118,7 @@ object Member { */ implicit val ordering: Ordering[Member] = new Ordering[Member] { def compare(a: Member, b: Member): Int = { - val result = addressOrdering.compare(a.address, b.address) - if (result == 0) { - val aUid = a.uniqueAddress.uid - val bUid = b.uniqueAddress.uid - if (aUid < bUid) -1 else if (aUid == bUid) 0 else 1 - } else - result + a.uniqueAddress compare b.uniqueAddress } } @@ -218,4 +212,13 @@ object MemberStatus { * INTERNAL API */ @SerialVersionUID(1L) -private[cluster] case class UniqueAddress(address: Address, uid: Int) +private[cluster] case class UniqueAddress(address: Address, uid: Int) extends Ordered[UniqueAddress] { + @transient + override lazy val hashCode = scala.util.hashing.MurmurHash3.productHash(this) + + override def compare(that: UniqueAddress): Int = { + val result = Member.addressOrdering.compare(this.address, that.address) + if (result == 0) if (this.uid < that.uid) -1 else if (this.uid == that.uid) 0 else 1 + else result + } +} diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala index 8e53416810..1690f1cd73 100644 --- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala +++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala @@ -5,87 +5,29 @@ package akka.cluster import akka.AkkaException -import akka.event.Logging -import akka.actor.ActorSystem import System.{ currentTimeMillis ⇒ newTimestamp } import java.security.MessageDigest import java.util.concurrent.atomic.AtomicLong - -class VectorClockException(message: String) extends AkkaException(message) - -/** - * Trait to be extended by classes that wants to be versioned using a VectorClock. - */ -trait Versioned[T] { - def version: VectorClock - def :+(node: VectorClock.Node): T -} - -/** - * Utility methods for comparing Versioned instances. - */ -object Versioned { - - /** - * The result of comparing two Versioned objects. - * Either: - * {{{ - * 1) v1 is BEFORE v2 => Before - * 2) v1 is AFTER t2 => After - * 3) v1 happens CONCURRENTLY to v2 => Concurrent - * }}} - */ - sealed trait Ordering - case object Before extends Ordering - case object After extends Ordering - case object Concurrent extends Ordering - - /** - * Returns or 'Ordering' for the two 'Versioned' instances. - */ - def compare[T <: Versioned[T]](versioned1: Versioned[T], versioned2: Versioned[T]): Ordering = { - if (versioned1.version <> versioned2.version) Concurrent - else if (versioned1.version < versioned2.version) Before - else After - } - - /** - * Returns the Versioned that have the latest version. - */ - def latestVersionOf[T <: Versioned[T]](versioned1: T, versioned2: T): T = { - compare(versioned1, versioned2) match { - case Concurrent ⇒ versioned2 - case Before ⇒ versioned2 - case After ⇒ versioned1 - } - } -} +import scala.collection.immutable.TreeMap /** * VectorClock module with helper classes and methods. * * Based on code from the 'vlock' VectorClock library by Coda Hale. */ -object VectorClock { +private[cluster] object VectorClock { /** * Hash representation of a versioned node name. */ - sealed trait Node extends Serializable { - def hash: String - } + type Node = String object Node { - @SerialVersionUID(1L) - private case class NodeImpl(name: String) extends Node { - override def toString(): String = "Node(" + name + ")" - override def hash: String = name - } - def apply(name: String): Node = NodeImpl(hash(name)) + def apply(name: String): Node = hash(name) - def fromHash(hash: String): Node = NodeImpl(hash) + def fromHash(hash: String): Node = hash private def hash(name: String): String = { val digester = MessageDigest.getInstance("MD5") @@ -98,11 +40,8 @@ object VectorClock { * Timestamp representation a unique 'Ordered' timestamp. */ @SerialVersionUID(1L) - case class Timestamp(time: Long) extends Ordered[Timestamp] { - def max(other: Timestamp) = { - if (this < other) other - else this - } + final case class Timestamp(time: Long) extends Ordered[Timestamp] { + def max(other: Timestamp) = if (this < other) other else this def compare(other: Timestamp) = time compare other.time @@ -127,6 +66,12 @@ object VectorClock { new Timestamp(newTime) } } + + sealed trait Ordering + case object After extends Ordering + case object Before extends Ordering + case object Same extends Ordering + case object Concurrent extends Ordering } /** @@ -141,9 +86,7 @@ object VectorClock { */ @SerialVersionUID(1L) case class VectorClock( - timestamp: VectorClock.Timestamp = VectorClock.Timestamp(), - versions: Map[VectorClock.Node, VectorClock.Timestamp] = Map.empty[VectorClock.Node, VectorClock.Timestamp]) - extends PartiallyOrdered[VectorClock] { + versions: TreeMap[VectorClock.Node, VectorClock.Timestamp] = TreeMap.empty[VectorClock.Node, VectorClock.Timestamp]) { import VectorClock._ @@ -155,7 +98,17 @@ case class VectorClock( /** * Returns true if this and that are concurrent else false. */ - def <>(that: VectorClock): Boolean = tryCompareTo(that) == None + def <>(that: VectorClock): Boolean = compareTo(that) == Concurrent + + /** + * Returns true if this is before that else false. + */ + def <(that: VectorClock): Boolean = compareTo(that) == Before + + /** + * Returns true if this is after that else false. + */ + def >(that: VectorClock): Boolean = compareTo(that) == After /** * Returns true if this VectorClock has the same history as the 'that' VectorClock else false. @@ -163,39 +116,37 @@ case class VectorClock( def ==(that: VectorClock): Boolean = versions == that.versions /** - * For the 'PartiallyOrdered' trait, to allow natural comparisons using <, > and ==. - *

- * Compare two vector clocks. The outcomes will be one of the following: + * Compare two vector clocks. The outcome will be one of the following: *

* {{{ - * 1. Clock 1 is BEFORE (>) Clock 2 if there exists an i such that c1(i) <= c2(i) and there does not exist a j such that c1(j) > c2(j). - * 2. Clock 1 is CONCURRENT (<>) to Clock 2 if there exists an i, j such that c1(i) < c2(i) and c1(j) > c2(j). - * 3. Clock 1 is AFTER (<) Clock 2 otherwise. + * 1. Clock 1 is SAME (==) as Clock 2 iff for all i c1(i) == c2(i) + * 2. Clock 1 is BEFORE (<) Clock 2 iff for all i c1(i) <= c2(i) and there exist a j such that c1(j) < c2(j) + * 3. Clock 1 is AFTER (>) Clock 2 iff for all i c1(i) >= c2(i) and there exist a j such that c1(j) > c2(j). + * 4. Clock 1 is CONCURRENT (<>) to Clock 2 otherwise. * }}} */ - def tryCompareTo[V >: VectorClock <% PartiallyOrdered[V]](vclock: V): Option[Int] = { - def compare(versions1: Map[Node, Timestamp], versions2: Map[Node, Timestamp]): Boolean = { - versions1.forall { case ((n, t)) ⇒ t <= versions2.getOrElse(n, Timestamp.zero) } && - (versions1.exists { case ((n, t)) ⇒ t < versions2.getOrElse(n, Timestamp.zero) } || - (versions1.size < versions2.size)) - } - vclock match { - case VectorClock(_, otherVersions) ⇒ - if (compare(versions, otherVersions)) Some(-1) - else if (compare(otherVersions, versions)) Some(1) - else if (versions == otherVersions) Some(0) - else None - case _ ⇒ None + def compareTo(that: VectorClock): Ordering = { + def olderOrSameIn(version: Pair[Node, Timestamp], versions: Map[Node, Timestamp]): Boolean = { + version match { case (n, t) ⇒ t <= versions(n) } } + + if (versions == that.versions) Same + else if (versions.forall(olderOrSameIn(_, that.versions.withDefaultValue(Timestamp.zero)))) Before + else if (that.versions.forall(olderOrSameIn(_, versions.withDefaultValue(Timestamp.zero)))) After + else Concurrent } /** * Merges this VectorClock with another VectorClock. E.g. merges its versioned history. */ def merge(that: VectorClock): VectorClock = { - val mergedVersions = scala.collection.mutable.Map.empty[Node, Timestamp] ++ that.versions - for ((node, time) ← versions) mergedVersions(node) = time max mergedVersions.getOrElse(node, time) - VectorClock(timestamp, Map.empty[Node, Timestamp] ++ mergedVersions) + var mergedVersions = that.versions + for ((node, time) ← versions) { + val mergedVersionsCurrentTime = mergedVersions.getOrElse(node, Timestamp.zero) + if (time > mergedVersionsCurrentTime) + mergedVersions = mergedVersions.updated(node, time) + } + VectorClock(mergedVersions) } override def toString = versions.map { case ((n, t)) ⇒ n + " -> " + t }.mkString("VectorClock(", ", ", ")") diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 2736ebd927..f02653bca4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -6,6 +6,7 @@ package akka.cluster.protobuf import akka.serialization.Serializer import akka.cluster._ import scala.collection.breakOut +import scala.collection.immutable.TreeMap import akka.actor.{ ExtendedActorSystem, Address } import scala.Some import scala.collection.immutable @@ -60,7 +61,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ case m: MetricsGossipEnvelope ⇒ compress(metricsGossipEnvelopeToProto(m)) case InternalClusterAction.Join(node, roles) ⇒ - msg.Join(uniqueAddressToProto(node), roles.map(identity)(breakOut): Vector[String]).toByteArray + msg.Join(uniqueAddressToProto(node), roles.toVector).toByteArray case InternalClusterAction.Welcome(from, gossip) ⇒ compress(msg.Welcome(uniqueAddressToProto(from), gossipToProto(gossip))) case ClusterUserAction.Leave(address) ⇒ @@ -162,8 +163,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ val addressMapping = allAddresses.zipWithIndex.toMap val allRoles = allMembers.foldLeft(Set.empty[String])((acc, m) ⇒ acc ++ m.roles).to[Vector] val roleMapping = allRoles.zipWithIndex.toMap - val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.map(_.hash).toSet) { - case (s, VectorClock(t, v)) ⇒ s ++ v.keys.map(_.hash) + val allHashes = gossip.overview.seen.values.foldLeft(gossip.version.versions.keys.toSet) { + case (s, VectorClock(v)) ⇒ s ++ v.keys }.to[Vector] val hashMapping = allHashes.zipWithIndex.toMap @@ -192,8 +193,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): msg.VectorClock = { def mapHash(hash: String) = mapWithErrorMessage(hashMapping, hash, "hash") - msg.VectorClock(version.timestamp.time, - version.versions.map { case (n, t) ⇒ msg.VectorClock.Version(mapHash(n.hash), t.time) }.to[Vector]) + msg.VectorClock(None, + version.versions.map { case (n, t) ⇒ msg.VectorClock.Version(mapHash(n), t.time) }.to[Vector]) } private def gossipEnvelopeToProto(envelope: GossipEnvelope): msg.GossipEnvelope = { @@ -202,7 +203,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ } private def gossipStatusToProto(status: GossipStatus): msg.GossipStatus = { - val allHashes: Vector[String] = status.version.versions.keys.map(_.hash)(collection.breakOut) + val allHashes: Vector[String] = status.version.versions.keys.toVector val hashMapping = allHashes.zipWithIndex.toMap msg.GossipStatus(uniqueAddressToProto(status.from), allHashes, vectorClockToProto(status.version, hashMapping)) } @@ -230,18 +231,17 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ val members = gossip.members.map(memberFromProto).to[immutable.SortedSet] val unreachable = gossip.overview.unreachable.map(memberFromProto).toSet - val seen = gossip.overview.seen.map(seenFromProto).toMap + val seen = gossip.overview.seen.map(seenFromProto)(breakOut): TreeMap[UniqueAddress, VectorClock] val overview = GossipOverview(seen, unreachable) Gossip(members, overview, vectorClockFromProto(gossip.version, hashMapping)) } private def vectorClockFromProto(version: msg.VectorClock, hashMapping: immutable.Seq[String]) = { - VectorClock(VectorClock.Timestamp(version.timestamp), - version.versions.map { - case msg.VectorClock.Version(h, t) ⇒ - (VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t)) - }.toMap) + VectorClock(version.versions.map({ + case msg.VectorClock.Version(h, t) ⇒ + (VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t)) + })(breakOut): TreeMap[VectorClock.Node, VectorClock.Timestamp]) } private def gossipEnvelopeFromProto(envelope: msg.GossipEnvelope): GossipEnvelope = { diff --git a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala index fad1665e5f..4738b3b523 100644 --- a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala @@ -199,7 +199,6 @@ class VectorClockSpec extends AkkaSpec { "pass blank clock incrementing" in { val node1 = Node("1") val node2 = Node("2") - val node3 = Node("3") val v1 = VectorClock() val v2 = VectorClock() @@ -236,84 +235,4 @@ class VectorClockSpec extends AkkaSpec { (c1 > b1) must equal(true) } } - - "An instance of Versioned" must { - class TestVersioned(val version: VectorClock = VectorClock()) extends Versioned[TestVersioned] { - def :+(node: Node): TestVersioned = new TestVersioned(version :+ node) - } - - import Versioned.latestVersionOf - - "have zero versions when created" in { - val versioned = new TestVersioned() - versioned.version.versions must be(Map()) - } - - "happen before an identical versioned with a single additional event" in { - val versioned1_1 = new TestVersioned() - val versioned2_1 = versioned1_1 :+ Node("1") - val versioned3_1 = versioned2_1 :+ Node("2") - val versioned4_1 = versioned3_1 :+ Node("1") - - val versioned1_2 = new TestVersioned() - val versioned2_2 = versioned1_2 :+ Node("1") - val versioned3_2 = versioned2_2 :+ Node("2") - val versioned4_2 = versioned3_2 :+ Node("1") - val versioned5_2 = versioned4_2 :+ Node("3") - - latestVersionOf[TestVersioned](versioned4_1, versioned5_2) must be(versioned5_2) - } - - "pass misc comparison test 1" in { - var versioned1_1 = new TestVersioned() - val versioned2_1 = versioned1_1 :+ Node("1") - - val versioned1_2 = new TestVersioned() - val versioned2_2 = versioned1_2 :+ Node("2") - - latestVersionOf[TestVersioned](versioned2_1, versioned2_2) must be(versioned2_2) - } - - "pass misc comparison test 2" in { - val versioned1_3 = new TestVersioned() - val versioned2_3 = versioned1_3 :+ Node("1") - val versioned3_3 = versioned2_3 :+ Node("2") - val versioned4_3 = versioned3_3 :+ Node("1") - - val versioned1_4 = new TestVersioned() - val versioned2_4 = versioned1_4 :+ Node("1") - val versioned3_4 = versioned2_4 :+ Node("1") - val versioned4_4 = versioned3_4 :+ Node("3") - - latestVersionOf[TestVersioned](versioned4_3, versioned4_4) must be(versioned4_4) - } - - "pass misc comparison test 3" in { - val versioned1_1 = new TestVersioned() - val versioned2_1 = versioned1_1 :+ Node("2") - val versioned3_1 = versioned2_1 :+ Node("2") - - val versioned1_2 = new TestVersioned() - val versioned2_2 = versioned1_2 :+ Node("1") - val versioned3_2 = versioned2_2 :+ Node("2") - val versioned4_2 = versioned3_2 :+ Node("2") - val versioned5_2 = versioned4_2 :+ Node("3") - - latestVersionOf[TestVersioned](versioned3_1, versioned5_2) must be(versioned5_2) - } - - "pass misc comparison test 4" in { - val versioned1_1 = new TestVersioned() - val versioned2_1 = versioned1_1 :+ Node("1") - val versioned3_1 = versioned2_1 :+ Node("2") - val versioned4_1 = versioned3_1 :+ Node("2") - val versioned5_1 = versioned4_1 :+ Node("3") - - val versioned1_2 = new TestVersioned() - val versioned2_2 = versioned1_2 :+ Node("2") - val versioned3_2 = versioned2_2 :+ Node("2") - - latestVersionOf[TestVersioned](versioned5_1, versioned3_2) must be(versioned3_2) - } - } }