diff --git a/akka-remote/src/main/scala/akka/remote/VectorClock.scala b/akka-remote/src/main/scala/akka/remote/VectorClock.scala index fde9bb84e7..42ea917669 100644 --- a/akka-remote/src/main/scala/akka/remote/VectorClock.scala +++ b/akka-remote/src/main/scala/akka/remote/VectorClock.scala @@ -8,6 +8,20 @@ import akka.AkkaException class VectorClockException(message: String) extends AkkaException(message) +trait Versioned { + def version: VectorClock +} + +object Versioned { + def latestVersionOf[T <: Versioned](versioned1: T, versioned2: T): T = { + (versioned1.version compare versioned2.version) match { + case VectorClock.Before ⇒ versioned2 // version 1 is BEFORE (older), use version 2 + case VectorClock.After ⇒ versioned1 // version 1 is AFTER (newer), use version 1 + case VectorClock.Concurrent ⇒ versioned1 // can't establish a causal relationship between versions => conflict - keeping version 1 + } + } +} + /** * Representation of a Vector-based clock (counting clock), inspired by Lamport logical clocks. * diff --git a/akka-remote/src/test/scala/akka/remote/VectorClockSpec.scala b/akka-remote/src/test/scala/akka/remote/VectorClockSpec.scala index 5bfda16666..03e4109423 100644 --- a/akka-remote/src/test/scala/akka/remote/VectorClockSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/VectorClockSpec.scala @@ -6,7 +6,7 @@ import akka.testkit.AkkaSpec class VectorClockSpec extends AkkaSpec { import VectorClock._ - "An VectorClock" must { + "A VectorClock" must { "have zero versions when created" in { val clock = VectorClock() @@ -40,7 +40,7 @@ class VectorClockSpec extends AkkaSpec { clock1.compare(clock2) must not be (Concurrent) } - "A clock should not happen before an identical clock" in { + "not happen before an identical clock" in { val clock1_1 = VectorClock() val clock2_1 = clock1_1.increment(1, System.currentTimeMillis) val clock3_1 = clock2_1.increment(2, System.currentTimeMillis) @@ -54,7 +54,7 @@ class VectorClockSpec extends AkkaSpec { clock4_1.compare(clock4_2) must not be (Concurrent) } - "A clock should happen before an identical clock with a single additional event" in { + "happen before an identical clock with a single additional event" in { val clock1_1 = VectorClock() val clock2_1 = clock1_1.increment(1, System.currentTimeMillis) val clock3_1 = clock2_1.increment(2, System.currentTimeMillis) @@ -121,4 +121,82 @@ class VectorClockSpec extends AkkaSpec { clock5_1.compare(clock3_2) must be(After) } } + + "A Versioned" must { + class TestVersioned(val version: VectorClock = VectorClock()) extends Versioned { + def increment(v: Int, time: Long) = new TestVersioned(version.increment(v, time)) + } + + "have zero versions when created" in { + val versioned = new TestVersioned() + versioned.version.versions must be(Vector()) + } + + "happen before an identical versioned with a single additional event" in { + val versioned1_1 = new TestVersioned() + val versioned2_1 = versioned1_1.increment(1, System.currentTimeMillis) + val versioned3_1 = versioned2_1.increment(2, System.currentTimeMillis) + val versioned4_1 = versioned3_1.increment(1, System.currentTimeMillis) + + val versioned1_2 = new TestVersioned() + val versioned2_2 = versioned1_2.increment(1, System.currentTimeMillis) + val versioned3_2 = versioned2_2.increment(2, System.currentTimeMillis) + val versioned4_2 = versioned3_2.increment(1, System.currentTimeMillis) + val versioned5_2 = versioned4_2.increment(3, System.currentTimeMillis) + + Versioned.latestVersionOf[TestVersioned](versioned4_1, versioned5_2) must be(versioned5_2) + } + + "Two versioneds with different events should be concurrent: 1" in { + var versioned1_1 = new TestVersioned() + val versioned2_1 = versioned1_1.increment(1, System.currentTimeMillis) + + val versioned1_2 = new TestVersioned() + val versioned2_2 = versioned1_2.increment(2, System.currentTimeMillis) + + Versioned.latestVersionOf[TestVersioned](versioned2_1, versioned2_2) must be(versioned2_1) + } + + "Two versioneds with different events should be concurrent: 2" in { + val versioned1_3 = new TestVersioned() + val versioned2_3 = versioned1_3.increment(1, System.currentTimeMillis) + val versioned3_3 = versioned2_3.increment(2, System.currentTimeMillis) + val versioned4_3 = versioned3_3.increment(1, System.currentTimeMillis) + + val versioned1_4 = new TestVersioned() + val versioned2_4 = versioned1_4.increment(1, System.currentTimeMillis) + val versioned3_4 = versioned2_4.increment(1, System.currentTimeMillis) + val versioned4_4 = versioned3_4.increment(3, System.currentTimeMillis) + + Versioned.latestVersionOf[TestVersioned](versioned4_3, versioned4_4) must be(versioned4_3) + } + + "be earlier than another versioned if it has an older version" in { + val versioned1_1 = new TestVersioned() + val versioned2_1 = versioned1_1.increment(2, System.currentTimeMillis) + val versioned3_1 = versioned2_1.increment(2, System.currentTimeMillis) + + val versioned1_2 = new TestVersioned() + val versioned2_2 = versioned1_2.increment(1, System.currentTimeMillis) + val versioned3_2 = versioned2_2.increment(2, System.currentTimeMillis) + val versioned4_2 = versioned3_2.increment(2, System.currentTimeMillis) + val versioned5_2 = versioned4_2.increment(3, System.currentTimeMillis) + + Versioned.latestVersionOf[TestVersioned](versioned3_1, versioned5_2) must be(versioned5_2) + } + + "be later than another versioned if it has an newer version" in { + val versioned1_1 = new TestVersioned() + val versioned2_1 = versioned1_1.increment(1, System.currentTimeMillis) + val versioned3_1 = versioned2_1.increment(2, System.currentTimeMillis) + val versioned4_1 = versioned3_1.increment(2, System.currentTimeMillis) + val versioned5_1 = versioned4_1.increment(3, System.currentTimeMillis) + + val versioned1_2 = new TestVersioned() + val versioned2_2 = versioned1_2.increment(2, System.currentTimeMillis) + val versioned3_2 = versioned2_2.increment(2, System.currentTimeMillis) + + Versioned.latestVersionOf[TestVersioned](versioned5_1, versioned3_2) must be(versioned5_1) + } + } }