diff --git a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala index f4baa555b3..c2cc2d5a55 100644 --- a/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala +++ b/akka-cluster/src/main/scala/akka/cluster/VectorClock.scala @@ -37,35 +37,9 @@ private[cluster] object VectorClock { } } - /** - * Timestamp representation a unique 'Ordered' timestamp. - */ - @SerialVersionUID(1L) - final case class Timestamp(time: Long) extends Ordered[Timestamp] { - def max(other: Timestamp) = if (this < other) other else this - - def compare(other: Timestamp) = time compare other.time - - override def toString = "%016x" format time - } - object Timestamp { - private val counter = new AtomicLong(newTimestamp) - - val zero: Timestamp = Timestamp(0L) - - def apply(): Timestamp = { - var newTime: Long = 0L - while (newTime == 0) { - val last = counter.get - val current = newTimestamp - val next = if (current > last) current else last + 1 - if (counter.compareAndSet(last, next)) { - newTime = next - } - } - new Timestamp(newTime) - } + final val Zero = 0L + final val EndMarker = Long.MinValue } sealed trait Ordering @@ -81,7 +55,7 @@ private[cluster] object VectorClock { /** * Marker to signal that we have reached the end of a vector clock. */ - private val cmpEndMarker = (VectorClock.Node("endmarker"), Timestamp(Int.MinValue)) + private val cmpEndMarker = (VectorClock.Node("endmarker"), Timestamp.EndMarker) } @@ -97,14 +71,17 @@ private[cluster] object VectorClock { */ @SerialVersionUID(1L) case class VectorClock( - versions: TreeMap[VectorClock.Node, VectorClock.Timestamp] = TreeMap.empty[VectorClock.Node, VectorClock.Timestamp]) { + versions: TreeMap[VectorClock.Node, Long] = TreeMap.empty[VectorClock.Node, Long]) { import VectorClock._ /** * Increment the version for the node passed as argument. Returns a new VectorClock. */ - def :+(node: Node): VectorClock = copy(versions = versions + (node -> Timestamp())) + def :+(node: Node): VectorClock = { + val currentTimestamp = versions.getOrElse(node, Timestamp.Zero) + copy(versions = versions.updated(node, currentTimestamp + 1)) + } /** * Returns true if this and that are concurrent else false. @@ -140,9 +117,9 @@ case class VectorClock( private final def compareOnlyTo(that: VectorClock, order: Ordering): Ordering = { def nextOrElse[T](iter: Iterator[T], default: T): T = if (iter.hasNext) iter.next() else default - def compare(i1: Iterator[(Node, Timestamp)], i2: Iterator[(Node, Timestamp)], requestedOrder: Ordering): Ordering = { + def compare(i1: Iterator[(Node, Long)], i2: Iterator[(Node, Long)], requestedOrder: Ordering): Ordering = { @tailrec - def compareNext(nt1: (Node, Timestamp), nt2: (Node, Timestamp), currentOrder: Ordering): Ordering = + def compareNext(nt1: (Node, Long), nt2: (Node, Long), currentOrder: Ordering): Ordering = if ((requestedOrder ne FullOrder) && (currentOrder ne Same) && (currentOrder ne requestedOrder)) currentOrder else if ((nt1 eq cmpEndMarker) && (nt2 eq cmpEndMarker)) currentOrder // i1 is empty but i2 is not, so i1 can only be Before @@ -154,10 +131,9 @@ case class VectorClock( val nc = nt1._1 compareTo nt2._1 if (nc == 0) { // both nodes exist compare the timestamps - val tc = nt1._2 compareTo nt2._2 // same timestamp so just continue with the next nodes - if (tc == 0) compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), currentOrder) - else if (tc < 0) { + if (nt1._2 == nt2._2) compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), currentOrder) + else if (nt1._2 < nt2._2) { // t1 is less than t2, so i1 can only be Before if (currentOrder eq After) Concurrent else compareNext(nextOrElse(i1, cmpEndMarker), nextOrElse(i2, cmpEndMarker), Before) @@ -204,7 +180,7 @@ case class VectorClock( def merge(that: VectorClock): VectorClock = { var mergedVersions = that.versions for ((node, time) ← versions) { - val mergedVersionsCurrentTime = mergedVersions.getOrElse(node, Timestamp.zero) + val mergedVersionsCurrentTime = mergedVersions.getOrElse(node, Timestamp.Zero) if (time > mergedVersionsCurrentTime) mergedVersions = mergedVersions.updated(node, time) } diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index 47babf0a0b..cbe0252e2b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -185,7 +185,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ private def vectorClockToProto(version: VectorClock, hashMapping: Map[String, Int]): msg.VectorClock = { val versions: Vector[msg.VectorClock.Version] = version.versions.map({ - case (n, t) ⇒ msg.VectorClock.Version(mapWithErrorMessage(hashMapping, n, "hash"), t.time) + case (n, t) ⇒ msg.VectorClock.Version(mapWithErrorMessage(hashMapping, n, "hash"), t) })(breakOut) msg.VectorClock(None, versions) } @@ -242,7 +242,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends Serializ private def vectorClockFromProto(version: msg.VectorClock, hashMapping: immutable.Seq[String]) = { VectorClock(version.versions.map({ - case msg.VectorClock.Version(h, t) ⇒ (VectorClock.Node.fromHash(hashMapping(h)), VectorClock.Timestamp(t)) + case msg.VectorClock.Version(h, t) ⇒ (VectorClock.Node.fromHash(hashMapping(h)), t) })(breakOut)) } diff --git a/akka-cluster/src/test/scala/akka/cluster/VectorClockPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/VectorClockPerfSpec.scala index fbfd234c0b..490e95a947 100644 --- a/akka-cluster/src/test/scala/akka/cluster/VectorClockPerfSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/VectorClockPerfSpec.scala @@ -20,8 +20,8 @@ object VectorClockPerfSpec { } def copyVectorClock(vc: VectorClock): VectorClock = { - val versions = (TreeMap.empty[Node, Timestamp] /: vc.versions) { - case (versions, (n, t)) ⇒ versions.updated(Node.fromHash(n), Timestamp(t.time)) + val versions = (TreeMap.empty[Node, Long] /: vc.versions) { + case (versions, (n, t)) ⇒ versions.updated(Node.fromHash(n), t) } vc.copy(versions = versions) } @@ -42,10 +42,10 @@ class VectorClockPerfSpec extends WordSpec with ShouldMatchers { val middleNode = nodes.drop(clockSize / 2).head val vcBaseLast = vcBefore :+ lastNode val vcAfterLast = vcBaseLast :+ firstNode - val vcConcurrentLast = vcBefore :+ lastNode + val vcConcurrentLast = vcBaseLast :+ lastNode val vcBaseMiddle = vcBefore :+ middleNode val vcAfterMiddle = vcBaseMiddle :+ firstNode - val vcConcurrentMiddle = vcBefore :+ middleNode + val vcConcurrentMiddle = vcBaseMiddle :+ middleNode def checkThunkFor(vc1: VectorClock, vc2: VectorClock, thunk: (VectorClock, VectorClock) ⇒ Unit, times: Int): Unit = { val vcc1 = copyVectorClock(vc1) diff --git a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala index de261d2119..b20ad33871 100644 --- a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala @@ -111,7 +111,7 @@ class VectorClockSpec extends AkkaSpec { val clock4_1 = clock3_1 :+ Node("2") val clock5_1 = clock4_1 :+ Node("3") - val clock1_2 = VectorClock() + val clock1_2 = clock4_1 val clock2_2 = clock1_2 :+ Node("2") val clock3_2 = clock2_2 :+ Node("2") @@ -143,7 +143,7 @@ class VectorClockSpec extends AkkaSpec { val clock4_1 = clock3_1 :+ node2 val clock5_1 = clock4_1 :+ node3 - val clock1_2 = VectorClock() + val clock1_2 = clock4_1 val clock2_2 = clock1_2 :+ node2 val clock3_2 = clock2_2 :+ node2