/** * Copyright (C) 2009-2017 Lightbend Inc. */ package akka.cluster.ddata import scala.concurrent.duration._ import akka.actor.Actor import akka.actor.ActorRef import akka.actor.ActorSelection import akka.actor.ActorSystem import akka.actor.Address import akka.actor.Props import akka.testkit._ import akka.cluster.ddata.Replicator.Internal._ import akka.cluster.ddata.Replicator._ import akka.remote.RARP import scala.concurrent.Future object WriteAggregatorSpec { val key = GSetKey[String]("a") def writeAggregatorProps(data: GSet[String], consistency: Replicator.WriteConsistency, probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean): Props = Props(new TestWriteAggregator(data, consistency, probes, nodes, unreachable, replyTo, durable)) class TestWriteAggregator(data: GSet[String], consistency: Replicator.WriteConsistency, probes: Map[Address, ActorRef], nodes: Set[Address], unreachable: Set[Address], replyTo: ActorRef, durable: Boolean) extends WriteAggregator(key, DataEnvelope(data), consistency, None, nodes, unreachable, replyTo, durable) { override def replica(address: Address): ActorSelection = context.actorSelection(probes(address).path) override def senderAddress(): Address = probes.find { case (a, r) ⇒ r == sender() }.get._1 } def writeAckAdapterProps(replica: ActorRef): Props = Props(new WriteAckAdapter(replica)) class WriteAckAdapter(replica: ActorRef) extends Actor { var replicator: Option[ActorRef] = None def receive = { case WriteAck ⇒ replicator.foreach(_ ! WriteAck) case WriteNack ⇒ replicator.foreach(_ ! WriteNack) case msg ⇒ replicator = Some(sender()) replica ! msg } } object TestMock { def apply()(implicit system: ActorSystem) = new TestMock(system) } class TestMock(_application: ActorSystem) extends TestProbe(_application) { val writeAckAdapter = system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(this.ref)) } } class WriteAggregatorSpec extends AkkaSpec(s""" akka.actor.provider = "cluster" akka.remote.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.cluster.distributed-data.durable.lmdb { dir = target/WriteAggregatorSpec-${System.currentTimeMillis}-ddata map-size = 10 MiB } """) with ImplicitSender { import WriteAggregatorSpec._ val protocol = if (RARP(system).provider.remoteSettings.Artery.Enabled) "akka" else "akka.tcp" val nodeA = Address(protocol, "Sys", "a", 2552) val nodeB = nodeA.copy(host = Some("b")) val nodeC = nodeA.copy(host = Some("c")) val nodeD = nodeA.copy(host = Some("d")) // 4 replicas + the local => 5 val nodes = Set(nodeA, nodeB, nodeC, nodeD) val data = GSet.empty + "A" + "B" val timeout = 3.seconds.dilated val writeThree = WriteTo(3, timeout) val writeMajority = WriteMajority(timeout) def probes(probe: ActorRef): Map[Address, ActorRef] = nodes.toSeq.map(_ → system.actorOf(WriteAggregatorSpec.writeAckAdapterProps(probe))).toMap /** * Create a tuple for each node with the WriteAckAdapter and the TestProbe */ def probes(): Map[Address, TestMock] = { val probe = TestProbe() nodes.toSeq.map(_ → TestMock()).toMap } "WriteAggregator" must { "send to at least N/2+1 replicas when WriteMajority" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false)) probe.expectMsgType[Write] probe.lastSender ! WriteAck probe.expectMsgType[Write] probe.lastSender ! WriteAck expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) watch(aggr) expectTerminated(aggr) } "send to more when no immediate reply" in { val testProbes = probes() val testProbeRefs = testProbes.map { case (a, tm) ⇒ a → tm.writeAckAdapter } val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( data, writeMajority, testProbeRefs, nodes, Set(nodeC, nodeD), testActor, durable = false)) testProbes(nodeA).expectMsgType[Write] // no reply testProbes(nodeB).expectMsgType[Write] testProbes(nodeB).lastSender ! WriteAck // Make sure that unreachable nodes do not get a message until 1/5 of the time the reachable nodes did not answer val t = timeout / 5 - 50.milliseconds.dilated import system.dispatcher Future.sequence { Seq( Future { testProbes(nodeC).expectNoMsg(t) }, Future { testProbes(nodeD).expectNoMsg(t) } ) }.futureValue testProbes(nodeC).expectMsgType[Write] testProbes(nodeC).lastSender ! WriteAck testProbes(nodeD).expectMsgType[Write] testProbes(nodeD).lastSender ! WriteAck expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) watch(aggr) expectTerminated(aggr) } "timeout when less than required acks" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = false)) probe.expectMsgType[Write] // no reply probe.expectMsgType[Write] probe.lastSender ! WriteAck probe.expectMsgType[Write] // no reply probe.expectMsgType[Write] // no reply expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None)) watch(aggr) expectTerminated(aggr) } "calculate majority with minCap" in { val minCap = 5 import ReadWriteAggregator._ calculateMajorityWithMinCap(minCap, 3) should be(3) calculateMajorityWithMinCap(minCap, 4) should be(4) calculateMajorityWithMinCap(minCap, 5) should be(5) calculateMajorityWithMinCap(minCap, 6) should be(5) calculateMajorityWithMinCap(minCap, 7) should be(5) calculateMajorityWithMinCap(minCap, 8) should be(5) calculateMajorityWithMinCap(minCap, 9) should be(5) calculateMajorityWithMinCap(minCap, 10) should be(6) calculateMajorityWithMinCap(minCap, 11) should be(6) calculateMajorityWithMinCap(minCap, 12) should be(7) } } "Durable WriteAggregator" must { "not reply before local confirmation" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) probe.expectMsgType[Write] probe.lastSender ! WriteAck probe.expectMsgType[Write] probe.lastSender ! WriteAck expectNoMsg(200.millis) // the local write aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) watch(aggr) expectTerminated(aggr) } "tolerate WriteNack if enough WriteAck" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( data, writeThree, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write probe.expectMsgType[Write] probe.lastSender ! WriteAck probe.expectMsgType[Write] probe.lastSender ! WriteNack probe.expectMsgType[Write] probe.lastSender ! WriteAck expectMsg(UpdateSuccess(WriteAggregatorSpec.key, None)) watch(aggr) expectTerminated(aggr) } "reply with StoreFailure when too many nacks" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) probe.expectMsgType[Write] probe.lastSender ! WriteNack aggr ! UpdateSuccess(WriteAggregatorSpec.key, None) // the local write probe.expectMsgType[Write] probe.lastSender ! WriteAck probe.expectMsgType[Write] probe.lastSender ! WriteNack probe.expectMsgType[Write] probe.lastSender ! WriteNack expectMsg(StoreFailure(WriteAggregatorSpec.key, None)) watch(aggr) expectTerminated(aggr) } "timeout when less than required acks" in { val probe = TestProbe() val aggr = system.actorOf(WriteAggregatorSpec.writeAggregatorProps( data, writeMajority, probes(probe.ref), nodes, Set.empty, testActor, durable = true)) probe.expectMsgType[Write] // no reply probe.expectMsgType[Write] probe.lastSender ! WriteAck probe.expectMsgType[Write] probe.lastSender ! WriteNack probe.expectMsgType[Write] probe.lastSender ! WriteNack expectMsg(UpdateTimeout(WriteAggregatorSpec.key, None)) watch(aggr) expectTerminated(aggr) } } }