Improve performance of DData delta updates, #25310

* Use deterministic order of the target nodes for the writes when
  type RequiresCausalDeliveryOfDeltas, otherwise the random pick
  of targets caused that delta sequence numbers were missing for
  susequent updates
* Resend immediately when receiving DeltaNack instead of waiting
  for timeout. DeltaNack can happen when there are multiple
  concurrent updates from same node because each starts a WriteAggregator
  and a later Update might bypass an earlier
This commit is contained in:
Patrik Nordwall 2018-07-11 13:14:25 +02:00 committed by Johan Andrén
parent db48514b88
commit 180ef934bc
3 changed files with 59 additions and 8 deletions

View file

@ -1,3 +1,7 @@
# #25310 Improve performance of DData delta updates
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.ReadWriteAggregator.secondaryNodes")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.ReadWriteAggregator.primaryNodes")
# #23703 Optimized serializer for ORSet[ActorRef]
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#GSetOrBuilder.getActorRefElementsCount")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatedDataMessages#GSetOrBuilder.getActorRefElementsList")

View file

@ -1948,13 +1948,17 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
def doneWhenRemainingSize: Int
lazy val (primaryNodes, secondaryNodes) = {
def primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas: Boolean): (Vector[Address], Vector[Address]) = {
val primarySize = nodes.size - doneWhenRemainingSize
if (primarySize >= nodes.size)
(nodes, Set.empty[Address])
(nodes.toVector, Vector.empty[Address])
else {
// Prefer to use reachable nodes over the unreachable nodes first
val orderedNodes = scala.util.Random.shuffle(reachableNodes.toVector) ++ scala.util.Random.shuffle(unreachable.toVector)
// Prefer to use reachable nodes over the unreachable nodes first.
// When RequiresCausalDeliveryOfDeltas use deterministic order to so that sequence numbers of subsequent
// updates are in sync on the destination nodes.
val orderedNodes =
if (requiresCausalDeliveryOfDeltas) reachableNodes.toVector.sorted ++ unreachable.toVector.sorted
else scala.util.Random.shuffle(reachableNodes.toVector) ++ scala.util.Random.shuffle(unreachable.toVector)
val (p, s) = orderedNodes.splitAt(primarySize)
(p, s.take(MaxSecondaryNodes))
}
@ -2030,6 +2034,14 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
var gotLocalStoreReply = !durable
var gotWriteNackFrom = Set.empty[Address]
private val (primaryNodes, secondaryNodes) = {
val requiresCausalDeliveryOfDeltas = delta match {
case None false
case Some(d) d.dataEnvelope.data.isInstanceOf[RequiresCausalDeliveryOfDeltas]
}
primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas)
}
override def preStart(): Unit = {
val msg = deltaMsg match {
case Some(d) d
@ -2048,7 +2060,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
gotWriteNackFrom += senderAddress()
if (isDone) reply(isTimeout = false)
case DeltaNack
// ok, will be retried with full state
// Deltas must be applied in order and we can't keep track of ordering of
// simultaneous updates so there is a chance that the delta could not be applied.
// Try again with the full state
sender() ! writeMsg
case _: Replicator.UpdateSuccess[_]
gotLocalStoreReply = true
@ -2148,6 +2163,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
val readMsg = Read(key.id)
private val (primaryNodes, secondaryNodes) = {
primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas = false)
}
override def preStart(): Unit = {
primaryNodes.foreach { replica(_) ! readMsg }

View file

@ -27,6 +27,7 @@ object PerformanceSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
akka.loggers = ["akka.event.Logging$$DefaultLogger"]
akka.actor.provider = "cluster"
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
@ -81,21 +82,28 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe
enterBarrier(from.name + "-joined")
}
def repeat(description: String, keys: Iterable[ORSetKey[Int]], n: Int, expectedAfterReplication: Option[Set[Int]] = None)(
def repeat(description: String, keys: Iterable[ORSetKey[Int]], n: Int,
expectedAfterReplication: Option[Set[Int]] = None, oneByOne: Boolean = false)(
block: (ORSetKey[Int], Int, ActorRef) Unit, afterEachKey: ORSetKey[Int] Unit = _ ()): Unit = {
keys.foreach { key
val startTime = System.nanoTime()
runOn(n1) {
val latch = TestLatch(n)
val replyTo = system.actorOf(countDownProps(latch))
val oneByOneProbe = TestProbe()
val replyTo =
if (oneByOne) oneByOneProbe.ref
else system.actorOf(countDownProps(latch))
var i = 0
while (i < n) {
block(key, i, replyTo)
i += 1
if (oneByOne)
oneByOneProbe.receiveOne(timeout)
}
Await.ready(latch, 10.seconds + (2.second * factor))
if (!oneByOne)
Await.ready(latch, 10.seconds + (2.second * factor))
}
expectedAfterReplication.foreach { expected
enterBarrier("repeat-" + key + "-before-awaitReplicated")
@ -234,6 +242,26 @@ class PerformanceSpec extends MultiNodeSpec(PerformanceSpec) with STMultiNodeSpe
enterBarrier("after-6")
}
"be good for ORSet one-by-one deltas" taggedAs PerformanceTest in {
val keys = (1 to repeatCount).map(n ORSetKey[Int]("E" + n))
val n = 300 * factor
val writeMajority = WriteMajority(timeout)
repeat("ORSet Update one-by-one deltas", keys, n, oneByOne = true) { (key, i, replyTo)
replicator.tell(Update(key, ORSet(), writeMajority)(_ + i), replyTo)
}
enterBarrier("after-7")
}
"be good for ORSet deltas" taggedAs PerformanceTest in {
val keys = (1 to repeatCount).map(n ORSetKey[Int]("F" + n))
val n = 200 * factor
val writeMajority = WriteMajority(timeout)
repeat("ORSet Update deltas", keys, n, oneByOne = false) { (key, i, replyTo)
replicator.tell(Update(key, ORSet(), writeMajority)(_ + i), replyTo)
}
enterBarrier("after-8")
}
}
}