2015-05-17 12:28:47 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.cluster.ddata
|
|
|
|
|
|
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
|
|
|
|
|
import akka.cluster.Cluster
|
|
|
|
|
import akka.remote.testconductor.RoleName
|
|
|
|
|
import akka.remote.testkit.MultiNodeConfig
|
|
|
|
|
import akka.remote.testkit.MultiNodeSpec
|
|
|
|
|
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
|
|
|
|
import akka.testkit._
|
|
|
|
|
import com.typesafe.config.ConfigFactory
|
|
|
|
|
|
|
|
|
|
object ReplicatorSpec extends MultiNodeConfig {
|
|
|
|
|
val first = role("first")
|
|
|
|
|
val second = role("second")
|
|
|
|
|
val third = role("third")
|
|
|
|
|
|
|
|
|
|
commonConfig(ConfigFactory.parseString("""
|
|
|
|
|
akka.loglevel = INFO
|
|
|
|
|
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
|
|
|
|
akka.log-dead-letters-during-shutdown = off
|
|
|
|
|
"""))
|
|
|
|
|
|
|
|
|
|
testTransport(on = true)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class ReplicatorSpecMultiJvmNode1 extends ReplicatorSpec
|
|
|
|
|
class ReplicatorSpecMultiJvmNode2 extends ReplicatorSpec
|
|
|
|
|
class ReplicatorSpecMultiJvmNode3 extends ReplicatorSpec
|
|
|
|
|
|
|
|
|
|
class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec with ImplicitSender {
|
|
|
|
|
import ReplicatorSpec._
|
|
|
|
|
import Replicator._
|
|
|
|
|
|
|
|
|
|
override def initialParticipants = roles.size
|
|
|
|
|
|
|
|
|
|
implicit val cluster = Cluster(system)
|
|
|
|
|
val replicator = system.actorOf(Replicator.props(
|
|
|
|
|
ReplicatorSettings(system).withGossipInterval(1.second).withMaxDeltaElements(10)), "replicator")
|
|
|
|
|
val timeout = 2.seconds.dilated
|
|
|
|
|
val writeTwo = WriteTo(2, timeout)
|
|
|
|
|
val writeMajority = WriteMajority(timeout)
|
|
|
|
|
val writeAll = WriteAll(timeout)
|
|
|
|
|
val readTwo = ReadFrom(2, timeout)
|
|
|
|
|
val readAll = ReadAll(timeout)
|
|
|
|
|
val readMajority = ReadMajority(timeout)
|
|
|
|
|
|
|
|
|
|
val KeyA = GCounterKey("A")
|
|
|
|
|
val KeyB = GCounterKey("B")
|
|
|
|
|
val KeyC = GCounterKey("C")
|
|
|
|
|
val KeyD = GCounterKey("D")
|
|
|
|
|
val KeyE = GCounterKey("E")
|
|
|
|
|
val KeyE2 = GCounterKey("E2")
|
|
|
|
|
val KeyF = GCounterKey("F")
|
|
|
|
|
val KeyG = ORSetKey[String]("G")
|
2015-06-18 16:53:55 +02:00
|
|
|
val KeyH = ORMapKey[Flag]("H")
|
2015-05-17 12:28:47 +02:00
|
|
|
val KeyX = GCounterKey("X")
|
|
|
|
|
val KeyY = GCounterKey("Y")
|
|
|
|
|
val KeyZ = GCounterKey("Z")
|
|
|
|
|
|
|
|
|
|
def join(from: RoleName, to: RoleName): Unit = {
|
|
|
|
|
runOn(from) {
|
|
|
|
|
cluster join node(to).address
|
|
|
|
|
}
|
|
|
|
|
enterBarrier(from.name + "-joined")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"Cluster CRDT" must {
|
|
|
|
|
|
|
|
|
|
"work in single node cluster" in {
|
|
|
|
|
join(first, first)
|
|
|
|
|
|
|
|
|
|
runOn(first) {
|
|
|
|
|
|
|
|
|
|
within(5.seconds) {
|
|
|
|
|
awaitAssert {
|
|
|
|
|
replicator ! GetReplicaCount
|
|
|
|
|
expectMsg(ReplicaCount(1))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val changedProbe = TestProbe()
|
|
|
|
|
replicator ! Subscribe(KeyA, changedProbe.ref)
|
|
|
|
|
replicator ! Subscribe(KeyX, changedProbe.ref)
|
|
|
|
|
|
|
|
|
|
replicator ! Get(KeyA, ReadLocal)
|
|
|
|
|
expectMsg(NotFound(KeyA, None))
|
|
|
|
|
|
|
|
|
|
val c3 = GCounter() + 3
|
|
|
|
|
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ + 3)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyA, None))
|
|
|
|
|
replicator ! Get(KeyA, ReadLocal)
|
|
|
|
|
expectMsg(GetSuccess(KeyA, None)(c3)).dataValue should be(c3)
|
|
|
|
|
changedProbe.expectMsg(Changed(KeyA)(c3)).dataValue should be(c3)
|
|
|
|
|
|
|
|
|
|
val changedProbe2 = TestProbe()
|
|
|
|
|
replicator ! Subscribe(KeyA, changedProbe2.ref)
|
|
|
|
|
changedProbe2.expectMsg(Changed(KeyA)(c3)).dataValue should be(c3)
|
|
|
|
|
|
|
|
|
|
val c4 = c3 + 1
|
|
|
|
|
// too strong consistency level
|
|
|
|
|
replicator ! Update(KeyA, GCounter(), writeTwo)(_ + 1)
|
|
|
|
|
expectMsg(UpdateTimeout(KeyA, None))
|
|
|
|
|
replicator ! Get(KeyA, ReadLocal)
|
|
|
|
|
expectMsg(GetSuccess(KeyA, None)(c4)).dataValue should be(c4)
|
|
|
|
|
changedProbe.expectMsg(Changed(KeyA)(c4)).dataValue should be(c4)
|
|
|
|
|
|
|
|
|
|
val c5 = c4 + 1
|
|
|
|
|
// too strong consistency level
|
|
|
|
|
replicator ! Update(KeyA, GCounter(), writeMajority)(_ + 1)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyA, None))
|
|
|
|
|
replicator ! Get(KeyA, readMajority)
|
|
|
|
|
expectMsg(GetSuccess(KeyA, None)(c5)).dataValue should be(c5)
|
|
|
|
|
changedProbe.expectMsg(Changed(KeyA)(c5)).dataValue should be(c5)
|
|
|
|
|
|
|
|
|
|
val c6 = c5 + 1
|
|
|
|
|
replicator ! Update(KeyA, GCounter(), writeAll)(_ + 1)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyA, None))
|
|
|
|
|
replicator ! Get(KeyA, readAll)
|
|
|
|
|
expectMsg(GetSuccess(KeyA, None)(c6)).dataValue should be(c6)
|
|
|
|
|
changedProbe.expectMsg(Changed(KeyA)(c6)).dataValue should be(c6)
|
|
|
|
|
|
|
|
|
|
val c9 = GCounter() + 9
|
|
|
|
|
replicator ! Update(KeyX, GCounter(), WriteLocal)(_ + 9)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyX, None))
|
|
|
|
|
changedProbe.expectMsg(Changed(KeyX)(c9)).dataValue should be(c9)
|
|
|
|
|
replicator ! Delete(KeyX, WriteLocal)
|
|
|
|
|
expectMsg(DeleteSuccess(KeyX))
|
|
|
|
|
changedProbe.expectMsg(DataDeleted(KeyX))
|
|
|
|
|
replicator ! Get(KeyX, ReadLocal)
|
|
|
|
|
expectMsg(DataDeleted(KeyX))
|
|
|
|
|
replicator ! Get(KeyX, readAll)
|
|
|
|
|
expectMsg(DataDeleted(KeyX))
|
|
|
|
|
replicator ! Update(KeyX, GCounter(), WriteLocal)(_ + 1)
|
|
|
|
|
expectMsg(DataDeleted(KeyX))
|
|
|
|
|
replicator ! Delete(KeyX, WriteLocal)
|
|
|
|
|
expectMsg(DataDeleted(KeyX))
|
|
|
|
|
|
|
|
|
|
replicator ! GetKeyIds
|
|
|
|
|
expectMsg(GetKeyIdsResult(Set("A")))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enterBarrier("after-1")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"reply with ModifyFailure if exception is thrown by modify function" in {
|
|
|
|
|
val e = new RuntimeException("errr")
|
|
|
|
|
replicator ! Update(KeyA, GCounter(), WriteLocal)(_ ⇒ throw e)
|
|
|
|
|
expectMsgType[ModifyFailure[_]].cause should be(e)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"replicate values to new node" in {
|
|
|
|
|
join(second, first)
|
|
|
|
|
|
|
|
|
|
runOn(first, second) {
|
|
|
|
|
within(10.seconds) {
|
|
|
|
|
awaitAssert {
|
|
|
|
|
replicator ! GetReplicaCount
|
|
|
|
|
expectMsg(ReplicaCount(2))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enterBarrier("2-nodes")
|
|
|
|
|
|
|
|
|
|
runOn(second) {
|
|
|
|
|
val changedProbe = TestProbe()
|
|
|
|
|
replicator ! Subscribe(KeyA, changedProbe.ref)
|
|
|
|
|
// "A" should be replicated via gossip to the new node
|
|
|
|
|
within(5.seconds) {
|
|
|
|
|
awaitAssert {
|
|
|
|
|
replicator ! Get(KeyA, ReadLocal)
|
|
|
|
|
val c = expectMsgPF() { case g @ GetSuccess(KeyA, _) ⇒ g.get(KeyA) }
|
|
|
|
|
c.value should be(6)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
val c = changedProbe.expectMsgPF() { case c @ Changed(KeyA) ⇒ c.get(KeyA) }
|
|
|
|
|
c.value should be(6)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enterBarrier("after-2")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"work in 2 node cluster" in {
|
|
|
|
|
|
|
|
|
|
runOn(first, second) {
|
|
|
|
|
// start with 20 on both nodes
|
|
|
|
|
replicator ! Update(KeyB, GCounter(), WriteLocal)(_ + 20)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyB, None))
|
|
|
|
|
|
|
|
|
|
// add 1 on both nodes using WriteTwo
|
|
|
|
|
replicator ! Update(KeyB, GCounter(), writeTwo)(_ + 1)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyB, None))
|
|
|
|
|
|
|
|
|
|
// the total, after replication should be 42
|
|
|
|
|
awaitAssert {
|
|
|
|
|
replicator ! Get(KeyB, readTwo)
|
|
|
|
|
val c = expectMsgPF() { case g @ GetSuccess(KeyB, _) ⇒ g.get(KeyB) }
|
|
|
|
|
c.value should be(42)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("update-42")
|
|
|
|
|
|
|
|
|
|
runOn(first, second) {
|
|
|
|
|
// add 1 on both nodes using WriteAll
|
|
|
|
|
replicator ! Update(KeyB, GCounter(), writeAll)(_ + 1)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyB, None))
|
|
|
|
|
|
|
|
|
|
// the total, after replication should be 44
|
|
|
|
|
awaitAssert {
|
|
|
|
|
replicator ! Get(KeyB, readAll)
|
|
|
|
|
val c = expectMsgPF() { case g @ GetSuccess(KeyB, _) ⇒ g.get(KeyB) }
|
|
|
|
|
c.value should be(44)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("update-44")
|
|
|
|
|
|
|
|
|
|
runOn(first, second) {
|
|
|
|
|
// add 1 on both nodes using WriteMajority
|
|
|
|
|
replicator ! Update(KeyB, GCounter(), writeMajority)(_ + 1)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyB, None))
|
|
|
|
|
|
|
|
|
|
// the total, after replication should be 46
|
|
|
|
|
awaitAssert {
|
|
|
|
|
replicator ! Get(KeyB, readMajority)
|
|
|
|
|
val c = expectMsgPF() { case g @ GetSuccess(KeyB, _) ⇒ g.get(KeyB) }
|
|
|
|
|
c.value should be(46)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enterBarrier("after-3")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"be replicated after succesful update" in {
|
|
|
|
|
val changedProbe = TestProbe()
|
|
|
|
|
runOn(first, second) {
|
|
|
|
|
replicator ! Subscribe(KeyC, changedProbe.ref)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
runOn(first) {
|
|
|
|
|
replicator ! Update(KeyC, GCounter(), writeTwo)(_ + 30)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyC, None))
|
|
|
|
|
changedProbe.expectMsgPF() { case c @ Changed(KeyC) ⇒ c.get(KeyC).value } should be(30)
|
|
|
|
|
|
|
|
|
|
replicator ! Update(KeyY, GCounter(), writeTwo)(_ + 30)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyY, None))
|
|
|
|
|
|
|
|
|
|
replicator ! Update(KeyZ, GCounter(), writeMajority)(_ + 30)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyZ, None))
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("update-c30")
|
|
|
|
|
|
|
|
|
|
runOn(second) {
|
|
|
|
|
replicator ! Get(KeyC, ReadLocal)
|
|
|
|
|
val c30 = expectMsgPF() { case g @ GetSuccess(KeyC, _) ⇒ g.get(KeyC) }
|
|
|
|
|
c30.value should be(30)
|
|
|
|
|
changedProbe.expectMsgPF() { case c @ Changed(KeyC) ⇒ c.get(KeyC).value } should be(30)
|
|
|
|
|
|
|
|
|
|
// replicate with gossip after WriteLocal
|
|
|
|
|
replicator ! Update(KeyC, GCounter(), WriteLocal)(_ + 1)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyC, None))
|
|
|
|
|
changedProbe.expectMsgPF() { case c @ Changed(KeyC) ⇒ c.get(KeyC).value } should be(31)
|
|
|
|
|
|
|
|
|
|
replicator ! Delete(KeyY, WriteLocal)
|
|
|
|
|
expectMsg(DeleteSuccess(KeyY))
|
|
|
|
|
|
|
|
|
|
replicator ! Get(KeyZ, readMajority)
|
|
|
|
|
expectMsgPF() { case g @ GetSuccess(KeyZ, _) ⇒ g.get(KeyZ).value } should be(30)
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("update-c31")
|
|
|
|
|
|
|
|
|
|
runOn(first) {
|
|
|
|
|
// KeyC and deleted KeyY should be replicated via gossip to the other node
|
|
|
|
|
within(5.seconds) {
|
|
|
|
|
awaitAssert {
|
|
|
|
|
replicator ! Get(KeyC, ReadLocal)
|
|
|
|
|
val c = expectMsgPF() { case g @ GetSuccess(KeyC, _) ⇒ g.get(KeyC) }
|
|
|
|
|
c.value should be(31)
|
|
|
|
|
|
|
|
|
|
replicator ! Get(KeyY, ReadLocal)
|
|
|
|
|
expectMsg(DataDeleted(KeyY))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
changedProbe.expectMsgPF() { case c @ Changed(KeyC) ⇒ c.get(KeyC).value } should be(31)
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("verified-c31")
|
|
|
|
|
|
|
|
|
|
// and also for concurrent updates
|
|
|
|
|
runOn(first, second) {
|
|
|
|
|
replicator ! Get(KeyC, ReadLocal)
|
|
|
|
|
val c31 = expectMsgPF() { case g @ GetSuccess(KeyC, _) ⇒ g.get(KeyC) }
|
|
|
|
|
c31.value should be(31)
|
|
|
|
|
|
|
|
|
|
replicator ! Update(KeyC, GCounter(), WriteLocal)(_ + 1)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyC, None))
|
|
|
|
|
|
|
|
|
|
within(5.seconds) {
|
|
|
|
|
awaitAssert {
|
|
|
|
|
replicator ! Get(KeyC, ReadLocal)
|
|
|
|
|
val c = expectMsgPF() { case g @ GetSuccess(KeyC, _) ⇒ g.get(KeyC) }
|
|
|
|
|
c.value should be(33)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enterBarrier("after-4")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"converge after partition" in {
|
|
|
|
|
runOn(first) {
|
|
|
|
|
replicator ! Update(KeyD, GCounter(), writeTwo)(_ + 40)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyD, None))
|
|
|
|
|
|
|
|
|
|
testConductor.blackhole(first, second, Direction.Both).await
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("blackhole-first-second")
|
|
|
|
|
|
|
|
|
|
runOn(first, second) {
|
|
|
|
|
replicator ! Get(KeyD, ReadLocal)
|
|
|
|
|
val c40 = expectMsgPF() { case g @ GetSuccess(KeyD, _) ⇒ g.get(KeyD) }
|
|
|
|
|
c40.value should be(40)
|
|
|
|
|
replicator ! Update(KeyD, GCounter() + 1, writeTwo)(_ + 1)
|
|
|
|
|
expectMsg(UpdateTimeout(KeyD, None))
|
|
|
|
|
replicator ! Update(KeyD, GCounter(), writeTwo)(_ + 1)
|
|
|
|
|
expectMsg(UpdateTimeout(KeyD, None))
|
|
|
|
|
}
|
|
|
|
|
runOn(first) {
|
|
|
|
|
for (n ← 1 to 30) {
|
|
|
|
|
val KeyDn = GCounterKey("D" + n)
|
|
|
|
|
replicator ! Update(KeyDn, GCounter(), WriteLocal)(_ + n)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyDn, None))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("updates-during-partion")
|
|
|
|
|
|
|
|
|
|
runOn(first) {
|
|
|
|
|
testConductor.passThrough(first, second, Direction.Both).await
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("passThrough-first-second")
|
|
|
|
|
|
|
|
|
|
runOn(first, second) {
|
|
|
|
|
replicator ! Get(KeyD, readTwo)
|
|
|
|
|
val c44 = expectMsgPF() { case g @ GetSuccess(KeyD, _) ⇒ g.get(KeyD) }
|
|
|
|
|
c44.value should be(44)
|
|
|
|
|
|
|
|
|
|
within(10.seconds) {
|
|
|
|
|
awaitAssert {
|
|
|
|
|
for (n ← 1 to 30) {
|
|
|
|
|
val KeyDn = GCounterKey("D" + n)
|
|
|
|
|
replicator ! Get(KeyDn, ReadLocal)
|
|
|
|
|
expectMsgPF() { case g @ GetSuccess(KeyDn, _) ⇒ g.get(KeyDn) }.value should be(n)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enterBarrier("after-5")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"support majority quorum write and read with 3 nodes with 1 unreachable" in {
|
|
|
|
|
join(third, first)
|
|
|
|
|
|
|
|
|
|
runOn(first, second, third) {
|
|
|
|
|
within(10.seconds) {
|
|
|
|
|
awaitAssert {
|
|
|
|
|
replicator ! GetReplicaCount
|
|
|
|
|
expectMsg(ReplicaCount(3))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("3-nodes")
|
|
|
|
|
|
|
|
|
|
runOn(first, second, third) {
|
|
|
|
|
replicator ! Update(KeyE, GCounter(), writeMajority)(_ + 50)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyE, None))
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("write-inital-majority")
|
|
|
|
|
|
|
|
|
|
runOn(first, second, third) {
|
|
|
|
|
replicator ! Get(KeyE, readMajority)
|
|
|
|
|
val c150 = expectMsgPF() { case g @ GetSuccess(KeyE, _) ⇒ g.get(KeyE) }
|
|
|
|
|
c150.value should be(150)
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("read-inital-majority")
|
|
|
|
|
|
|
|
|
|
runOn(first) {
|
|
|
|
|
testConductor.blackhole(first, third, Direction.Both).await
|
|
|
|
|
testConductor.blackhole(second, third, Direction.Both).await
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("blackhole-third")
|
|
|
|
|
|
|
|
|
|
runOn(second) {
|
|
|
|
|
replicator ! Update(KeyE, GCounter(), WriteLocal)(_ + 1)
|
|
|
|
|
expectMsg(UpdateSuccess(KeyE, None))
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("local-update-from-second")
|
|
|
|
|
|
|
|
|
|
runOn(first) {
|
|
|
|
|
// ReadMajority should retrive the previous update from second, before applying the modification
|
|
|
|
|
val probe1 = TestProbe()
|
|
|
|
|
val probe2 = TestProbe()
|
|
|
|
|
replicator.tell(Get(KeyE, readMajority), probe2.ref)
|
|
|
|
|
probe2.expectMsgType[GetSuccess[_]]
|
|
|
|
|
replicator.tell(Update(KeyE, GCounter(), writeMajority, None) { data ⇒
|
|
|
|
|
probe1.ref ! data.value
|
|
|
|
|
data + 1
|
|
|
|
|
}, probe2.ref)
|
|
|
|
|
// verify read your own writes, without waiting for the UpdateSuccess reply
|
|
|
|
|
// note that the order of the replies are not defined, and therefore we use separate probes
|
|
|
|
|
val probe3 = TestProbe()
|
|
|
|
|
replicator.tell(Get(KeyE, readMajority), probe3.ref)
|
|
|
|
|
probe1.expectMsg(151)
|
|
|
|
|
probe2.expectMsg(UpdateSuccess(KeyE, None))
|
|
|
|
|
val c152 = probe3.expectMsgPF() { case g @ GetSuccess(KeyE, _) ⇒ g.get(KeyE) }
|
|
|
|
|
c152.value should be(152)
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("majority-update-from-first")
|
|
|
|
|
|
|
|
|
|
runOn(second) {
|
|
|
|
|
val probe1 = TestProbe()
|
|
|
|
|
replicator.tell(Get(KeyE, readMajority), probe1.ref)
|
|
|
|
|
probe1.expectMsgType[GetSuccess[_]]
|
|
|
|
|
replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(153))(_ + 1), probe1.ref)
|
|
|
|
|
// verify read your own writes, without waiting for the UpdateSuccess reply
|
|
|
|
|
// note that the order of the replies are not defined, and therefore we use separate probes
|
|
|
|
|
val probe2 = TestProbe()
|
|
|
|
|
replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(154))(_ + 1), probe2.ref)
|
|
|
|
|
val probe3 = TestProbe()
|
|
|
|
|
replicator.tell(Update(KeyE, GCounter(), writeMajority, Some(155))(_ + 1), probe3.ref)
|
|
|
|
|
val probe5 = TestProbe()
|
|
|
|
|
replicator.tell(Get(KeyE, readMajority), probe5.ref)
|
|
|
|
|
probe1.expectMsg(UpdateSuccess(KeyE, Some(153)))
|
|
|
|
|
probe2.expectMsg(UpdateSuccess(KeyE, Some(154)))
|
|
|
|
|
probe3.expectMsg(UpdateSuccess(KeyE, Some(155)))
|
|
|
|
|
val c155 = probe5.expectMsgPF() { case g @ GetSuccess(KeyE, _) ⇒ g.get(KeyE) }
|
|
|
|
|
c155.value should be(155)
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("majority-update-from-second")
|
|
|
|
|
|
|
|
|
|
runOn(first, second) {
|
|
|
|
|
replicator ! Get(KeyE2, readAll, Some(998))
|
|
|
|
|
expectMsg(GetFailure(KeyE2, Some(998)))
|
|
|
|
|
replicator ! Get(KeyE2, ReadLocal)
|
|
|
|
|
expectMsg(NotFound(KeyE2, None))
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("read-all-fail-update")
|
|
|
|
|
|
|
|
|
|
runOn(first) {
|
|
|
|
|
testConductor.passThrough(first, third, Direction.Both).await
|
|
|
|
|
testConductor.passThrough(second, third, Direction.Both).await
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("passThrough-third")
|
|
|
|
|
|
|
|
|
|
runOn(third) {
|
|
|
|
|
replicator ! Get(KeyE, readMajority)
|
|
|
|
|
val c155 = expectMsgPF() { case g @ GetSuccess(KeyE, _) ⇒ g.get(KeyE) }
|
|
|
|
|
c155.value should be(155)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enterBarrier("after-6")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"converge after many concurrent updates" in within(10.seconds) {
|
|
|
|
|
runOn(first, second, third) {
|
|
|
|
|
var c = GCounter()
|
|
|
|
|
for (i ← 0 until 100) {
|
|
|
|
|
c += 1
|
|
|
|
|
replicator ! Update(KeyF, GCounter(), writeTwo)(_ + 1)
|
|
|
|
|
}
|
|
|
|
|
val results = receiveN(100)
|
|
|
|
|
results.map(_.getClass).toSet should be(Set(classOf[UpdateSuccess[_]]))
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("100-updates-done")
|
|
|
|
|
runOn(first, second, third) {
|
|
|
|
|
replicator ! Get(KeyF, readTwo)
|
|
|
|
|
val c = expectMsgPF() { case g @ GetSuccess(KeyF, _) ⇒ g.get(KeyF) }
|
|
|
|
|
c.value should be(3 * 100)
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("after-7")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"read-repair happens before GetSuccess" in {
|
|
|
|
|
runOn(first) {
|
|
|
|
|
replicator ! Update(KeyG, ORSet(), writeTwo)(_ + "a" + "b")
|
|
|
|
|
expectMsgType[UpdateSuccess[_]]
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("a-b-added-to-G")
|
|
|
|
|
runOn(second) {
|
|
|
|
|
replicator ! Get(KeyG, readAll)
|
|
|
|
|
expectMsgPF() { case g @ GetSuccess(KeyG, _) ⇒ g.get(KeyG).elements } should be(Set("a", "b"))
|
|
|
|
|
replicator ! Get(KeyG, ReadLocal)
|
|
|
|
|
expectMsgPF() { case g @ GetSuccess(KeyG, _) ⇒ g.get(KeyG).elements } should be(Set("a", "b"))
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("after-8")
|
|
|
|
|
}
|
|
|
|
|
|
2015-06-18 16:53:55 +02:00
|
|
|
"check that a remote update and a local update both cause a change event to emit with the merged data" in {
|
|
|
|
|
val changedProbe = TestProbe()
|
|
|
|
|
|
|
|
|
|
runOn(second) {
|
|
|
|
|
replicator ! Subscribe(KeyH, changedProbe.ref)
|
|
|
|
|
replicator ! Update(KeyH, ORMap.empty[Flag], writeTwo)(_ + ("a" -> Flag(enabled = false)))
|
|
|
|
|
changedProbe.expectMsgPF() { case c @ Changed(KeyH) ⇒ c.get(KeyH).entries } should be(Map("a" -> Flag(enabled = false)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enterBarrier("update-h1")
|
|
|
|
|
|
|
|
|
|
runOn(first) {
|
|
|
|
|
replicator ! Update(KeyH, ORMap.empty[Flag], writeTwo)(_ + ("a" -> Flag(enabled = true)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
runOn(second) {
|
|
|
|
|
changedProbe.expectMsgPF() { case c @ Changed(KeyH) ⇒ c.get(KeyH).entries } should be(Map("a" -> Flag(enabled = true)))
|
|
|
|
|
|
|
|
|
|
replicator ! Update(KeyH, ORMap.empty[Flag], writeTwo)(_ + ("b" -> Flag(enabled = true)))
|
|
|
|
|
changedProbe.expectMsgPF() { case c @ Changed(KeyH) ⇒ c.get(KeyH).entries } should be(
|
|
|
|
|
Map("a" -> Flag(enabled = true), "b" -> Flag(enabled = true)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enterBarrier("after-9")
|
|
|
|
|
}
|
|
|
|
|
|
2015-05-17 12:28:47 +02:00
|
|
|
}
|
|
|
|
|
|