diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 23c997e705..c5da8ae796 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -193,6 +193,8 @@ object Replicator { Props(new Replicator(settings)).withDeploy(Deploy.local).withDispatcher(settings.dispatcher) } + val DefaultMajorityMinCap: Int = 0 + sealed trait ReadConsistency { def timeout: FiniteDuration } @@ -202,7 +204,9 @@ object Replicator { final case class ReadFrom(n: Int, timeout: FiniteDuration) extends ReadConsistency { require(n >= 2, "ReadFrom n must be >= 2, use ReadLocal for n=1") } - final case class ReadMajority(timeout: FiniteDuration) extends ReadConsistency + final case class ReadMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends ReadConsistency { + def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap) + } final case class ReadAll(timeout: FiniteDuration) extends ReadConsistency sealed trait WriteConsistency { @@ -214,7 +218,9 @@ object Replicator { final case class WriteTo(n: Int, timeout: FiniteDuration) extends WriteConsistency { require(n >= 2, "WriteTo n must be >= 2, use WriteLocal for n=1") } - final case class WriteMajority(timeout: FiniteDuration) extends WriteConsistency + final case class WriteMajority(timeout: FiniteDuration, minCap: Int = DefaultMajorityMinCap) extends WriteConsistency { + def this(timeout: FiniteDuration) = this(timeout, DefaultMajorityMinCap) + } final case class WriteAll(timeout: FiniteDuration) extends WriteConsistency /** @@ -1489,6 +1495,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog private[akka] object ReadWriteAggregator { case object SendToSecondary val MaxSecondaryNodes = 10 + + def calculateMajorityWithMinCap(minCap: Int, numberOfNodes: Int): Int = { + if (numberOfNodes <= minCap) { + numberOfNodes + } else { + val majority = numberOfNodes / 2 + 1 + if (majority <= minCap) minCap + else majority + } + } } /** @@ -1571,9 +1587,9 @@ private[akka] class WriteAggregator( override val doneWhenRemainingSize = consistency match { case WriteTo(n, _) ⇒ nodes.size - (n - 1) case _: WriteAll ⇒ 0 - case _: WriteMajority ⇒ + case WriteMajority(_, minCap) ⇒ val N = nodes.size + 1 - val w = N / 2 + 1 // write to at least (N/2+1) nodes + val w = calculateMajorityWithMinCap(minCap, N) N - w case WriteLocal ⇒ throw new IllegalArgumentException("WriteLocal not supported by WriteAggregator") @@ -1678,9 +1694,9 @@ private[akka] class ReadAggregator( override val doneWhenRemainingSize = consistency match { case ReadFrom(n, _) ⇒ nodes.size - (n - 1) case _: ReadAll ⇒ 0 - case _: ReadMajority ⇒ + case ReadMajority(_, minCap) ⇒ val N = nodes.size + 1 - val r = N / 2 + 1 // read from at least (N/2+1) nodes + val r = calculateMajorityWithMinCap(minCap, N) N - r case ReadLocal ⇒ throw new IllegalArgumentException("ReadLocal not supported by ReadAggregator") diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala index 0fda3f28a0..8a4fcf2686 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala @@ -162,6 +162,23 @@ class WriteAggregatorSpec extends AkkaSpec(s""" watch(aggr) expectTerminated(aggr) } + + "calculate majority with minCap" in { + val minCap = 5 + + import ReadWriteAggregator._ + + calculateMajorityWithMinCap(minCap, 3) should be (3) + calculateMajorityWithMinCap(minCap, 4) should be (4) + calculateMajorityWithMinCap(minCap, 5) should be (5) + calculateMajorityWithMinCap(minCap, 6) should be (5) + calculateMajorityWithMinCap(minCap, 7) should be (5) + calculateMajorityWithMinCap(minCap, 8) should be (5) + calculateMajorityWithMinCap(minCap, 9) should be (5) + calculateMajorityWithMinCap(minCap, 10) should be (6) + calculateMajorityWithMinCap(minCap, 11) should be (6) + calculateMajorityWithMinCap(minCap, 12) should be (7) + } } "Durable WriteAggregator" must { diff --git a/akka-docs/rst/scala/distributed-data.rst b/akka-docs/rst/scala/distributed-data.rst index ae217f3e88..5c4be461f6 100644 --- a/akka-docs/rst/scala/distributed-data.rst +++ b/akka-docs/rst/scala/distributed-data.rst @@ -186,6 +186,14 @@ The ``Replicator`` writes and reads to a majority of replicas, i.e. **N / 2 + 1* in a 5 node cluster it writes to 3 nodes and reads from 3 nodes. In a 6 node cluster it writes to 4 nodes and reads from 4 nodes. +You can define a minimum number of nodes for ``WriteMajority`` and ``ReadMajority``, +this will minimize the risk of reading steal data. Minimum cap is +provided by minCap property of ``WriteMajority`` and ``ReadMajority`` and defines the required majority. +If the minCap is higher then **N / 2 + 1** the minCap will be used. + +For example if the minCap is 5 the ``WriteMajority`` and ``ReadMajority`` for cluster of 3 nodes will be 3, for +cluster of 6 nodes will be 5 and for cluster of 12 nodes will be 7(**N / 2 + 1**). + Here is an example of using ``WriteMajority`` and ``ReadMajority``: .. includecode:: ../../../akka-samples/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingCart.scala#read-write-majority diff --git a/project/MiMa.scala b/project/MiMa.scala index c74034dec0..eddaa24f4d 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -185,7 +185,15 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.apply"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.copy"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#DataDeleted.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#Delete.copy") + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#Delete.copy"), + + // #21618 distributed data + ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ddata.Replicator$ReadMajority$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#ReadMajority.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#ReadMajority.apply"), + ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ddata.Replicator$WriteMajority$"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#WriteMajority.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.Replicator#WriteMajority.apply") ) Map(