Add minimum cap for ReadMajority/WriteMajority #21618

This commit is contained in:
inakov 2017-01-18 14:13:40 +02:00
parent 91d6a3f125
commit e043a9fffe
4 changed files with 56 additions and 7 deletions

View file

@ -193,6 +193,8 @@ object Replicator {
Props(new Replicator(settings)).withDeploy(Deploy.local).withDispatcher(settings.dispatcher) Props(new Replicator(settings)).withDeploy(Deploy.local).withDispatcher(settings.dispatcher)
} }
val DefaultMajorityMinCap: Int = 0
sealed trait ReadConsistency { sealed trait ReadConsistency {
def timeout: FiniteDuration def timeout: FiniteDuration
} }
@ -202,7 +204,9 @@ object Replicator {
final case class ReadFrom(n: Int, timeout: FiniteDuration) extends ReadConsistency { final case class ReadFrom(n: Int, timeout: FiniteDuration) extends ReadConsistency {
require(n >= 2, "ReadFrom n must be >= 2, use ReadLocal for n=1") require(n >= 2, "ReadFrom n must be >= 2, use ReadLocal for n=1")
} }
final case class ReadMajority(timeout: FiniteDuration) extends ReadConsistency final case class ReadMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends ReadConsistency {
def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap)
}
final case class ReadAll(timeout: FiniteDuration) extends ReadConsistency final case class ReadAll(timeout: FiniteDuration) extends ReadConsistency
sealed trait WriteConsistency { sealed trait WriteConsistency {
@ -214,7 +218,9 @@ object Replicator {
final case class WriteTo(n: Int, timeout: FiniteDuration) extends WriteConsistency { final case class WriteTo(n: Int, timeout: FiniteDuration) extends WriteConsistency {
require(n >= 2, "WriteTo n must be >= 2, use WriteLocal for n=1") require(n >= 2, "WriteTo n must be >= 2, use WriteLocal for n=1")
} }
final case class WriteMajority(timeout: FiniteDuration) extends WriteConsistency final case class WriteMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends WriteConsistency {
def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap)
}
final case class WriteAll(timeout: FiniteDuration) extends WriteConsistency final case class WriteAll(timeout: FiniteDuration) extends WriteConsistency
/** /**
@ -1489,6 +1495,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
private[akka] object ReadWriteAggregator { private[akka] object ReadWriteAggregator {
case object SendToSecondary case object SendToSecondary
val MaxSecondaryNodes = 10 val MaxSecondaryNodes = 10
def calculateMajorityWithMinCap(minCap: Int, numberOfNodes: Int): Int = {
if (numberOfNodes <= minCap) {
numberOfNodes
} else {
val majority = numberOfNodes / 2 + 1
if (majority <= minCap) minCap
else majority
}
}
} }
/** /**
@ -1571,9 +1587,9 @@ private[akka] class WriteAggregator(
override val doneWhenRemainingSize = consistency match { override val doneWhenRemainingSize = consistency match {
case WriteTo(n, _) nodes.size - (n - 1) case WriteTo(n, _) nodes.size - (n - 1)
case _: WriteAll 0 case _: WriteAll 0
case _: WriteMajority case WriteMajority(_, minCap)
val N = nodes.size + 1 val N = nodes.size + 1
val w = N / 2 + 1 // write to at least (N/2+1) nodes val w = calculateMajorityWithMinCap(minCap, N)
N - w N - w
case WriteLocal case WriteLocal
throw new IllegalArgumentException("WriteLocal not supported by WriteAggregator") throw new IllegalArgumentException("WriteLocal not supported by WriteAggregator")
@ -1678,9 +1694,9 @@ private[akka] class ReadAggregator(
override val doneWhenRemainingSize = consistency match { override val doneWhenRemainingSize = consistency match {
case ReadFrom(n, _) nodes.size - (n - 1) case ReadFrom(n, _) nodes.size - (n - 1)
case _: ReadAll 0 case _: ReadAll 0
case _: ReadMajority case ReadMajority(_, minCap)
val N = nodes.size + 1 val N = nodes.size + 1
val r = N / 2 + 1 // read from at least (N/2+1) nodes val r = calculateMajorityWithMinCap(minCap, N)
N - r N - r
case ReadLocal case ReadLocal
throw new IllegalArgumentException("ReadLocal not supported by ReadAggregator") throw new IllegalArgumentException("ReadLocal not supported by ReadAggregator")

View file

@ -162,6 +162,23 @@ class WriteAggregatorSpec extends AkkaSpec(s"""
watch(aggr) watch(aggr)
expectTerminated(aggr) expectTerminated(aggr)
} }
"calculate majority with minCap" in {
val minCap = 5
import ReadWriteAggregator._
calculateMajorityWithMinCap(minCap, 3) should be (3)
calculateMajorityWithMinCap(minCap, 4) should be (4)
calculateMajorityWithMinCap(minCap, 5) should be (5)
calculateMajorityWithMinCap(minCap, 6) should be (5)
calculateMajorityWithMinCap(minCap, 7) should be (5)
calculateMajorityWithMinCap(minCap, 8) should be (5)
calculateMajorityWithMinCap(minCap, 9) should be (5)
calculateMajorityWithMinCap(minCap, 10) should be (6)
calculateMajorityWithMinCap(minCap, 11) should be (6)
calculateMajorityWithMinCap(minCap, 12) should be (7)
}
} }
"Durable WriteAggregator" must { "Durable WriteAggregator" must {

View file

@ -186,6 +186,14 @@ The ``Replicator`` writes and reads to a majority of replicas, i.e. **N / 2 + 1*
in a 5 node cluster it writes to 3 nodes and reads from 3 nodes. In a 6 node cluster it writes in a 5 node cluster it writes to 3 nodes and reads from 3 nodes. In a 6 node cluster it writes
to 4 nodes and reads from 4 nodes. to 4 nodes and reads from 4 nodes.
You can define a minimum number of nodes for ``WriteMajority`` and ``ReadMajority``,
this will minimize the risk of reading steal data. Minimum cap is
provided by minCap property of ``WriteMajority`` and ``ReadMajority`` and defines the required majority.
If the minCap is higher then **N / 2 + 1** the minCap will be used.
For example if the minCap is 5 the ``WriteMajority`` and ``ReadMajority`` for cluster of 3 nodes will be 3, for
cluster of 6 nodes will be 5 and for cluster of 12 nodes will be 7(**N / 2 + 1**).
Here is an example of using ``WriteMajority`` and ``ReadMajority``: Here is an example of using ``WriteMajority`` and ``ReadMajority``:
.. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#read-write-majority .. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#read-write-majority

View file

@ -185,7 +185,15 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.apply"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.apply"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.copy"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.copy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#Delete.copy") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#Delete.copy"),
// #21618 distributed data
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ddata.Replicator$ReadMajority$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#ReadMajority.copy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#ReadMajority.apply"),
ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ddata.Replicator$WriteMajority$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#WriteMajority.copy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#WriteMajority.apply")
) )
Map( Map(