Exclude exiting members in Read/Write MajorityPlus, #30327 (#30328)

* this saves at least 2 seconds where the coordinator is not able to respond
  when the oldest node is shutdown
* also reduce default write-majority-plus for sharding, introduced in #28856
This commit is contained in:
Patrik Nordwall 2021-08-18 15:34:30 +02:00 committed by GitHub
parent c958822608
commit 0305a5f05a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 17 deletions

View file

@ -193,7 +193,7 @@ akka.cluster.sharding {
# is to have more tolerance for membership changes between write and read.
# The tradeoff of increasing this is that updates will be slower.
# It is more important to increase the `read-majority-plus`.
write-majority-plus = 5
write-majority-plus = 3
# State retrieval when ShardCoordinator is started is required to be read
# from a majority of nodes plus this number of additional nodes. Can also
# be set to "all" to require reads from all nodes. The reason for write/read

View file

@ -491,7 +491,8 @@ object Replicator {
/**
* `ReadMajority` but with the given number of `additional` nodes added to the majority count. At most
* all nodes.
* all nodes. Exiting nodes are excluded using `ReadMajorityPlus` because those are typically
* about to be removed and will not be able to respond.
*/
final case class ReadMajorityPlus(timeout: FiniteDuration, additional: Int, minCap: Int = DefaultMajorityMinCap)
extends ReadConsistency {
@ -535,7 +536,8 @@ object Replicator {
/**
* `WriteMajority` but with the given number of `additional` nodes added to the majority count. At most
* all nodes.
* all nodes. Exiting nodes are excluded using `WriteMajorityPlus` because those are typically
* about to be removed and will not be able to respond.
*/
final case class WriteMajorityPlus(timeout: FiniteDuration, additional: Int, minCap: Int = DefaultMajorityMinCap)
extends WriteConsistency {
@ -1460,6 +1462,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// cluster joining nodes, doesn't contain selfAddress
var joiningNodes: immutable.SortedSet[UniqueAddress] = immutable.SortedSet.empty
// cluster exiting nodes, doesn't contain selfAddress
var exitingNodes: immutable.SortedSet[UniqueAddress] = immutable.SortedSet.empty
// up and weaklyUp nodes, doesn't contain joining and not selfAddress
private def allNodes: immutable.SortedSet[UniqueAddress] = nodes.union(weaklyUpNodes)
@ -1501,11 +1506,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// messages after loading durable data.
var replyTo: ActorRef = null
private def nodesForReadWrite(): Vector[UniqueAddress] = {
if (settings.preferOldest)
membersByAge.iterator.map(_.uniqueAddress).toVector
else
nodes.toVector
private def nodesForReadWrite(excludeExiting: Boolean): Vector[UniqueAddress] = {
if (excludeExiting && exitingNodes.nonEmpty) {
if (settings.preferOldest) membersByAge.iterator.collect {
case m if !exitingNodes.contains(m.uniqueAddress) => m.uniqueAddress
}.toVector
else nodes.diff(exitingNodes).toVector
} else {
if (settings.preferOldest) membersByAge.iterator.map(_.uniqueAddress).toVector
else nodes.toVector
}
}
override protected[akka] def aroundReceive(rcv: Actor.Receive, msg: Any): Unit = {
@ -1661,6 +1671,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
case MemberJoined(m) => receiveMemberJoining(m)
case MemberWeaklyUp(m) => receiveMemberWeaklyUp(m)
case MemberUp(m) => receiveMemberUp(m)
case MemberExited(m) => receiveMemberExiting(m)
case MemberRemoved(m, _) => receiveMemberRemoved(m)
case evt: MemberEvent => receiveOtherMemberEvent(evt.member)
case UnreachableMember(m) => receiveUnreachable(m)
@ -1683,6 +1694,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
replyTo ! reply
} else {
val excludeExiting = consistency match {
case _: ReadMajorityPlus | _: ReadAll => true
case _ => false
}
context.actorOf(
ReadAggregator
.props(
@ -1690,7 +1705,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
consistency,
req,
selfUniqueAddress,
nodesForReadWrite(),
nodesForReadWrite(excludeExiting),
unreachable,
!settings.preferOldest,
localValue,
@ -1783,6 +1798,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
// of subsequent updates are in sync on the destination nodes.
// The order is also kept when prefer-oldest is enabled.
val shuffle = !(settings.preferOldest || writeDelta.exists(_.requiresCausalDeliveryOfDeltas))
val excludeExiting = writeConsistency match {
case _: WriteMajorityPlus | _: WriteAll => true
case _ => false
}
val writeAggregator =
context.actorOf(
WriteAggregator
@ -1793,7 +1812,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
writeConsistency,
req,
selfUniqueAddress,
nodesForReadWrite(),
nodesForReadWrite(excludeExiting),
unreachable,
shuffle,
replyTo,
@ -1901,6 +1920,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
else
replyTo ! DeleteSuccess(key, req)
} else {
val excludeExiting = consistency match {
case _: WriteMajorityPlus | _: WriteAll => true
case _ => false
}
val writeAggregator =
context.actorOf(
WriteAggregator
@ -1911,7 +1934,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
consistency,
req,
selfUniqueAddress,
nodesForReadWrite(),
nodesForReadWrite(excludeExiting),
unreachable,
!settings.preferOldest,
replyTo,
@ -2322,6 +2345,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
}
}
def receiveMemberExiting(m: Member): Unit =
if (matchingRole(m) && m.address != selfAddress)
exitingNodes += m.uniqueAddress
def receiveMemberRemoved(m: Member): Unit = {
if (m.address == selfAddress)
context.stop(self)
@ -2332,6 +2359,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
nodes -= m.uniqueAddress
weaklyUpNodes -= m.uniqueAddress
joiningNodes -= m.uniqueAddress
exitingNodes -= m.uniqueAddress
removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime)
unreachable -= m.uniqueAddress
if (settings.preferOldest)

View file

@ -229,9 +229,10 @@ at least **N/2 + 1** replicas, where N is the number of nodes in the cluster
(or cluster role group)
* `WriteMajorityPlus` is like `WriteMajority` but with the given number of `additional` nodes added
to the majority count. At most all nodes. This gives better tolerance for membership changes between
writes and reads.
* `WriteAll` the value will immediately be written to all nodes in the cluster
(or all nodes in the cluster role group)
writes and reads. Exiting nodes are excluded using `WriteMajorityPlus` because those are typically about to be removed
and will not be able to respond.
* `WriteAll` the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group).
Exiting nodes are excluded using `WriteAll` because those are typically about to be removed and will not be able to respond.
When you specify to write to `n` out of `x` nodes, the update will first replicate to `n` nodes.
If there are not enough Acks after a 1/5th of the timeout, the update will be replicated to `n` other
@ -263,9 +264,10 @@ at least **N/2 + 1** replicas, where N is the number of nodes in the cluster
(or cluster role group)
* `ReadMajorityPlus` is like `ReadMajority` but with the given number of `additional` nodes added
to the majority count. At most all nodes. This gives better tolerance for membership changes between
writes and reads.
* `ReadAll` the value will be read and merged from all nodes in the cluster
(or all nodes in the cluster role group)
writes and reads. Exiting nodes are excluded using `ReadMajorityPlus` because those are typically about to be
removed and will not be able to respond.
* `ReadAll` the value will be read and merged from all nodes in the cluster (or all nodes in the cluster role group).
Exiting nodes are excluded using `ReadAll` because those are typically about to be removed and will not be able to respond.
Note that `ReadMajority` and `ReadMajorityPlus` have a `minCap` parameter that is useful to specify to achieve
better safety for small clusters.