From 180ef934bc698610d1a55f6005dea6cd560fb0f9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 11 Jul 2018 13:14:25 +0200 Subject: [PATCH] Improve performance of DData delta updates, #25310 * Use deterministic order of the target nodes for the writes when type RequiresCausalDeliveryOfDeltas, otherwise the random pick of targets caused that delta sequence numbers were missing for susequent updates * Resend immediately when receiving DeltaNack instead of waiting for timeout. DeltaNack can happen when there are multiple concurrent updates from same node because each starts a WriteAggregator and a later Update might bypass an earlier --- .../mima-filters/2.5.13.backwards.excludes | 4 +++ .../scala/akka/cluster/ddata/Replicator.scala | 29 +++++++++++++--- .../akka/cluster/ddata/PerformanceSpec.scala | 34 +++++++++++++++++-- 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/akka-distributed-data/src/main/mima-filters/2.5.13.backwards.excludes b/akka-distributed-data/src/main/mima-filters/2.5.13.backwards.excludes index 2a142e7280..11114c2d51 100644 --- a/akka-distributed-data/src/main/mima-filters/2.5.13.backwards.excludes +++ b/akka-distributed-data/src/main/mima-filters/2.5.13.backwards.excludes @@ -1,3 +1,7 @@ +# #25310 Improve performance of DData delta updates +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.ReadWriteAggregator.secondaryNodes") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.ReadWriteAggregator.primaryNodes") + # #23703 Optimized serializer for ORSet[ActorRef] ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#GSetOrBuilder.getActorRefElementsCount") ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#GSetOrBuilder.getActorRefElementsList") diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 0d34a10657..aa5ffc3e0b 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -1948,13 +1948,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog def doneWhenRemainingSize: Int - lazy val (primaryNodes, secondaryNodes) = { + def primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas: Boolean): (Vector[Address], Vector[Address]) = { val primarySize = nodes.size - doneWhenRemainingSize if (primarySize >= nodes.size) - (nodes, Set.empty[Address]) + (nodes.toVector, Vector.empty[Address]) else { - // Prefer to use reachable nodes over the unreachable nodes first - val orderedNodes = scala.util.Random.shuffle(reachableNodes.toVector) ++ scala.util.Random.shuffle(unreachable.toVector) + // Prefer to use reachable nodes over the unreachable nodes first. + // When RequiresCausalDeliveryOfDeltas use deterministic order to so that sequence numbers of subsequent + // updates are in sync on the destination nodes. + val orderedNodes = + if (requiresCausalDeliveryOfDeltas) reachableNodes.toVector.sorted ++ unreachable.toVector.sorted + else scala.util.Random.shuffle(reachableNodes.toVector) ++ scala.util.Random.shuffle(unreachable.toVector) val (p, s) = orderedNodes.splitAt(primarySize) (p, s.take(MaxSecondaryNodes)) } @@ -2030,6 +2034,14 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog var gotLocalStoreReply = !durable var gotWriteNackFrom = Set.empty[Address] + private val (primaryNodes, secondaryNodes) = { + val requiresCausalDeliveryOfDeltas = delta match { + case None ⇒ false + case Some(d) ⇒ d.dataEnvelope.data.isInstanceOf[RequiresCausalDeliveryOfDeltas] + } + primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas) + } + override def preStart(): Unit = { val msg = deltaMsg match { case Some(d) ⇒ d @@ -2048,7 +2060,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog gotWriteNackFrom += senderAddress() if (isDone) reply(isTimeout = false) case DeltaNack ⇒ - // ok, will be retried with full state + // Deltas must be applied in order and we can't keep track of ordering of + // simultaneous updates so there is a chance that the delta could not be applied. + // Try again with the full state + sender() ! writeMsg case _: Replicator.UpdateSuccess[_] ⇒ gotLocalStoreReply = true @@ -2148,6 +2163,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val readMsg = Read(key.id) + private val (primaryNodes, secondaryNodes) = { + primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas = false) + } + override def preStart(): Unit = { primaryNodes.foreach { replica(_) ! readMsg } diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala index 44184fdb94..d791cbc669 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/PerformanceSpec.scala @@ -27,6 +27,7 @@ object PerformanceSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(s""" akka.loglevel = ERROR akka.stdout-loglevel = ERROR + akka.loggers = ["akka.event.Logging$$DefaultLogger"] akka.actor.provider = "cluster" akka.log-dead-letters = off akka.log-dead-letters-during-shutdown = off @@ -81,21 +82,28 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe enterBarrier(from.name + "-joined") } - def repeat(description: String, keys: Iterable[ORSetKey[Int]], n: Int, expectedAfterReplication: Option[Set[Int]] = None)( + def repeat(description: String, keys: Iterable[ORSetKey[Int]], n: Int, + expectedAfterReplication: Option[Set[Int]] = None, oneByOne: Boolean = false)( block: (ORSetKey[Int], Int, ActorRef) ⇒ Unit, afterEachKey: ORSetKey[Int] ⇒ Unit = _ ⇒ ()): Unit = { keys.foreach { key ⇒ val startTime = System.nanoTime() runOn(n1) { val latch = TestLatch(n) - val replyTo = system.actorOf(countDownProps(latch)) + val oneByOneProbe = TestProbe() + val replyTo = + if (oneByOne) oneByOneProbe.ref + else system.actorOf(countDownProps(latch)) var i = 0 while (i < n) { block(key, i, replyTo) i += 1 + if (oneByOne) + oneByOneProbe.receiveOne(timeout) } - Await.ready(latch, 10.seconds + (2.second * factor)) + if (!oneByOne) + Await.ready(latch, 10.seconds + (2.second * factor)) } expectedAfterReplication.foreach { expected ⇒ enterBarrier("repeat-" + key + "-before-awaitReplicated") @@ -234,6 +242,26 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe enterBarrier("after-6") } + "be good for ORSet one-by-one deltas" taggedAs PerformanceTest in { + val keys = (1 to repeatCount).map(n ⇒ ORSetKey[Int]("E" + n)) + val n = 300 * factor + val writeMajority = WriteMajority(timeout) + repeat("ORSet Update one-by-one deltas", keys, n, oneByOne = true) { (key, i, replyTo) ⇒ + replicator.tell(Update(key, ORSet(), writeMajority)(_ + i), replyTo) + } + enterBarrier("after-7") + } + + "be good for ORSet deltas" taggedAs PerformanceTest in { + val keys = (1 to repeatCount).map(n ⇒ ORSetKey[Int]("F" + n)) + val n = 200 * factor + val writeMajority = WriteMajority(timeout) + repeat("ORSet Update deltas", keys, n, oneByOne = false) { (key, i, replyTo) ⇒ + replicator.tell(Update(key, ORSet(), writeMajority)(_ + i), replyTo) + } + enterBarrier("after-8") + } + } }