diff --git a/akka-remote/src/main/scala/akka/remote/VectorClock.scala b/akka-remote/src/main/scala/akka/remote/VectorClock.scala index e0373a21e3..9515a859e2 100644 --- a/akka-remote/src/main/scala/akka/remote/VectorClock.scala +++ b/akka-remote/src/main/scala/akka/remote/VectorClock.scala @@ -9,8 +9,11 @@ import akka.AkkaException class VectorClockException(message: String) extends AkkaException(message) /** - * Representation of a Vector-based clock (counting clock). - * For details see Wikipedia: [http://en.wikipedia.org/wiki/Vector_clock]. + * Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks. + * + * Reference: + * Leslie Lamport (1978). "Time, clocks, and the ordering of events in a distributed system". Communications of the ACM 21 (7): 558-565. + * Friedemann Mattern (1988). "Virtual Time and Global States of Distributed Systems". Workshop on Parallel and Distributed Algorithms: pp. 215-226 */ case class VectorClock( versions: Vector[VectorClock.Entry] = Vector.empty[VectorClock.Entry], @@ -19,17 +22,17 @@ case class VectorClock( def compare(other: VectorClock): Ordering = VectorClock.compare(this, other) - def incrementVersionForNode(nodeId: Int, timestamp: Long): VectorClock = { + def increment(fingerprint: Int, timestamp: Long): VectorClock = { val newVersions = - if (versions exists (entry ⇒ entry.nodeId == nodeId)) { + if (versions exists (entry ⇒ entry.fingerprint == fingerprint)) { // update existing node entry versions map { entry ⇒ - if (entry.nodeId == nodeId) entry.increment() + if (entry.fingerprint == fingerprint) entry.increment() else entry } } else { // create and append a new node entry - versions :+ Entry(nodeId = nodeId) + versions :+ Entry(fingerprint = fingerprint) } if (newVersions.size > MaxNrOfVersions) throw new VectorClockException("Max number of versions reached") copy(versions = newVersions, timestamp = timestamp) @@ -37,6 +40,7 @@ case class VectorClock( def maxVersion: Long = versions.foldLeft(1L)((max, entry) ⇒ math.max(max, entry.version)) + // FIXME implement VectorClock.merge def merge(other: VectorClock): VectorClock = { sys.error("Not implemented") } @@ -58,13 +62,13 @@ object VectorClock { sealed trait Ordering case object Before extends Ordering case object After extends Ordering - case object Concurrently extends Ordering + case object Concurrent extends Ordering /** * Versioned entry in a vector clock. */ - case class Entry(nodeId: Int, version: Long = 1) { - def increment(): Entry = copy(version = version + 1) + case class Entry(fingerprint: Int, version: Long = 1L) { + def increment(): Entry = copy(version = version + 1L) } /** @@ -88,12 +92,12 @@ object VectorClock { while (p1 < v1.versions.size && p2 < v2.versions.size) { val ver1 = v1.versions(p1) val ver2 = v2.versions(p2) - if (ver1.nodeId == ver2.nodeId) { + if (ver1.fingerprint == ver2.fingerprint) { if (ver1.version > ver2.version) v1Bigger = true else if (ver2.version > ver1.version) v2Bigger = true p1 += 1 p2 += 1 - } else if (ver1.nodeId > ver2.nodeId) { + } else if (ver1.fingerprint > ver2.fingerprint) { v2Bigger = true // Since ver1 is bigger that means it is missing a version that ver2 has p2 += 1 } else { @@ -108,6 +112,6 @@ object VectorClock { if (!v1Bigger && !v2Bigger) Before // This is the case where they are equal, return BEFORE arbitrarily else if (v1Bigger && !v2Bigger) After // This is the case where v1 is a successor clock to v2 else if (!v1Bigger && v2Bigger) Before // This is the case where v2 is a successor clock to v1 - else Concurrently // This is the case where both clocks are parallel to one another + else Concurrent // This is the case where both clocks are parallel to one another } } diff --git a/akka-remote/src/test/scala/akka/remote/VectorClockSpec.scala b/akka-remote/src/test/scala/akka/remote/VectorClockSpec.scala index 10b8ea1714..1a65a4cb78 100644 --- a/akka-remote/src/test/scala/akka/remote/VectorClockSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/VectorClockSpec.scala @@ -18,19 +18,19 @@ class VectorClockSpec extends WordSpec with MustMatchers { "be able to add Entry if non-existing" in { val clock1 = VectorClock() clock1.versions must be(Vector()) - val clock2 = clock1.incrementVersionForNode(1, System.currentTimeMillis) - val clock3 = clock2.incrementVersionForNode(2, System.currentTimeMillis) + val clock2 = clock1.increment(1, System.currentTimeMillis) + val clock3 = clock2.increment(2, System.currentTimeMillis) clock3.versions must be(Vector(Entry(1, 1), Entry(2, 1))) } "be able to increment version of existing Entry" in { val clock1 = VectorClock() - val clock2 = clock1.incrementVersionForNode(1, System.currentTimeMillis) - val clock3 = clock2.incrementVersionForNode(2, System.currentTimeMillis) - val clock4 = clock3.incrementVersionForNode(1, System.currentTimeMillis) - val clock5 = clock4.incrementVersionForNode(2, System.currentTimeMillis) - val clock6 = clock5.incrementVersionForNode(2, System.currentTimeMillis) + val clock2 = clock1.increment(1, System.currentTimeMillis) + val clock3 = clock2.increment(2, System.currentTimeMillis) + val clock4 = clock3.increment(1, System.currentTimeMillis) + val clock5 = clock4.increment(2, System.currentTimeMillis) + val clock6 = clock5.increment(2, System.currentTimeMillis) clock6.versions must be(Vector(Entry(1, 2), Entry(2, 3))) } @@ -39,86 +39,86 @@ class VectorClockSpec extends WordSpec with MustMatchers { val clock1 = VectorClock() val clock2 = VectorClock() - clock1.compare(clock2) must not be (Concurrently) + clock1.compare(clock2) must not be (Concurrent) } "A clock should not happen before an identical clock" in { val clock1_1 = VectorClock() - val clock2_1 = clock1_1.incrementVersionForNode(1, System.currentTimeMillis) - val clock3_1 = clock2_1.incrementVersionForNode(2, System.currentTimeMillis) - val clock4_1 = clock3_1.incrementVersionForNode(1, System.currentTimeMillis) + val clock2_1 = clock1_1.increment(1, System.currentTimeMillis) + val clock3_1 = clock2_1.increment(2, System.currentTimeMillis) + val clock4_1 = clock3_1.increment(1, System.currentTimeMillis) val clock1_2 = VectorClock() - val clock2_2 = clock1_2.incrementVersionForNode(1, System.currentTimeMillis) - val clock3_2 = clock2_2.incrementVersionForNode(2, System.currentTimeMillis) - val clock4_2 = clock3_2.incrementVersionForNode(1, System.currentTimeMillis) + val clock2_2 = clock1_2.increment(1, System.currentTimeMillis) + val clock3_2 = clock2_2.increment(2, System.currentTimeMillis) + val clock4_2 = clock3_2.increment(1, System.currentTimeMillis) - clock4_1.compare(clock4_2) must not be (Concurrently) + clock4_1.compare(clock4_2) must not be (Concurrent) } "A clock should happen before an identical clock with a single additional event" in { val clock1_1 = VectorClock() - val clock2_1 = clock1_1.incrementVersionForNode(1, System.currentTimeMillis) - val clock3_1 = clock2_1.incrementVersionForNode(2, System.currentTimeMillis) - val clock4_1 = clock3_1.incrementVersionForNode(1, System.currentTimeMillis) + val clock2_1 = clock1_1.increment(1, System.currentTimeMillis) + val clock3_1 = clock2_1.increment(2, System.currentTimeMillis) + val clock4_1 = clock3_1.increment(1, System.currentTimeMillis) val clock1_2 = VectorClock() - val clock2_2 = clock1_2.incrementVersionForNode(1, System.currentTimeMillis) - val clock3_2 = clock2_2.incrementVersionForNode(2, System.currentTimeMillis) - val clock4_2 = clock3_2.incrementVersionForNode(1, System.currentTimeMillis) - val clock5_2 = clock4_2.incrementVersionForNode(3, System.currentTimeMillis) + val clock2_2 = clock1_2.increment(1, System.currentTimeMillis) + val clock3_2 = clock2_2.increment(2, System.currentTimeMillis) + val clock4_2 = clock3_2.increment(1, System.currentTimeMillis) + val clock5_2 = clock4_2.increment(3, System.currentTimeMillis) clock4_1.compare(clock5_2) must be(Before) } "Two clocks with different events should be concurrent: 1" in { var clock1_1 = VectorClock() - val clock2_1 = clock1_1.incrementVersionForNode(1, System.currentTimeMillis) + val clock2_1 = clock1_1.increment(1, System.currentTimeMillis) val clock1_2 = VectorClock() - val clock2_2 = clock1_2.incrementVersionForNode(2, System.currentTimeMillis) + val clock2_2 = clock1_2.increment(2, System.currentTimeMillis) - clock2_1.compare(clock2_2) must be(Concurrently) + clock2_1.compare(clock2_2) must be(Concurrent) } "Two clocks with different events should be concurrent: 2" in { val clock1_3 = VectorClock() - val clock2_3 = clock1_3.incrementVersionForNode(1, System.currentTimeMillis) - val clock3_3 = clock2_3.incrementVersionForNode(2, System.currentTimeMillis) - val clock4_3 = clock3_3.incrementVersionForNode(1, System.currentTimeMillis) + val clock2_3 = clock1_3.increment(1, System.currentTimeMillis) + val clock3_3 = clock2_3.increment(2, System.currentTimeMillis) + val clock4_3 = clock3_3.increment(1, System.currentTimeMillis) val clock1_4 = VectorClock() - val clock2_4 = clock1_4.incrementVersionForNode(1, System.currentTimeMillis) - val clock3_4 = clock2_4.incrementVersionForNode(1, System.currentTimeMillis) - val clock4_4 = clock3_4.incrementVersionForNode(3, System.currentTimeMillis) + val clock2_4 = clock1_4.increment(1, System.currentTimeMillis) + val clock3_4 = clock2_4.increment(1, System.currentTimeMillis) + val clock4_4 = clock3_4.increment(3, System.currentTimeMillis) - clock4_3.compare(clock4_4) must be(Concurrently) + clock4_3.compare(clock4_4) must be(Concurrent) } ".." in { val clock1_1 = VectorClock() - val clock2_1 = clock1_1.incrementVersionForNode(2, System.currentTimeMillis) - val clock3_1 = clock2_1.incrementVersionForNode(2, System.currentTimeMillis) + val clock2_1 = clock1_1.increment(2, System.currentTimeMillis) + val clock3_1 = clock2_1.increment(2, System.currentTimeMillis) val clock1_2 = VectorClock() - val clock2_2 = clock1_2.incrementVersionForNode(1, System.currentTimeMillis) - val clock3_2 = clock2_2.incrementVersionForNode(2, System.currentTimeMillis) - val clock4_2 = clock3_2.incrementVersionForNode(2, System.currentTimeMillis) - val clock5_2 = clock4_2.incrementVersionForNode(3, System.currentTimeMillis) + val clock2_2 = clock1_2.increment(1, System.currentTimeMillis) + val clock3_2 = clock2_2.increment(2, System.currentTimeMillis) + val clock4_2 = clock3_2.increment(2, System.currentTimeMillis) + val clock5_2 = clock4_2.increment(3, System.currentTimeMillis) clock3_1.compare(clock5_2) must be(Before) } "..." in { val clock1_1 = VectorClock() - val clock2_1 = clock1_1.incrementVersionForNode(1, System.currentTimeMillis) - val clock3_1 = clock2_1.incrementVersionForNode(2, System.currentTimeMillis) - val clock4_1 = clock3_1.incrementVersionForNode(2, System.currentTimeMillis) - val clock5_1 = clock4_1.incrementVersionForNode(3, System.currentTimeMillis) + val clock2_1 = clock1_1.increment(1, System.currentTimeMillis) + val clock3_1 = clock2_1.increment(2, System.currentTimeMillis) + val clock4_1 = clock3_1.increment(2, System.currentTimeMillis) + val clock5_1 = clock4_1.increment(3, System.currentTimeMillis) val clock1_2 = VectorClock() - val clock2_2 = clock1_2.incrementVersionForNode(2, System.currentTimeMillis) - val clock3_2 = clock2_2.incrementVersionForNode(2, System.currentTimeMillis) + val clock2_2 = clock1_2.increment(2, System.currentTimeMillis) + val clock3_2 = clock2_2.increment(2, System.currentTimeMillis) clock5_1.compare(clock3_2) must be(After) }