diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 3ef539382b..6b96332ee9 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -193,7 +193,7 @@ akka.cluster.sharding { # is to have more tolerance for membership changes between write and read. # The tradeoff of increasing this is that updates will be slower. # It is more important to increase the `read-majority-plus`. - write-majority-plus = 5 + write-majority-plus = 3 # State retrieval when ShardCoordinator is started is required to be read # from a majority of nodes plus this number of additional nodes. Can also # be set to "all" to require reads from all nodes. The reason for write/read diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 853ae45500..865093971f 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -491,7 +491,8 @@ object Replicator { /** * `ReadMajority` but with the given number of `additional` nodes added to the majority count. At most - * all nodes. + * all nodes. Exiting nodes are excluded using `ReadMajorityPlus` because those are typically + * about to be removed and will not be able to respond. */ final case class ReadMajorityPlus(timeout: FiniteDuration, additional: Int, minCap: Int = DefaultMajorityMinCap) extends ReadConsistency { @@ -535,7 +536,8 @@ object Replicator { /** * `WriteMajority` but with the given number of `additional` nodes added to the majority count. At most - * all nodes. + * all nodes. Exiting nodes are excluded using `WriteMajorityPlus` because those are typically + * about to be removed and will not be able to respond. */ final case class WriteMajorityPlus(timeout: FiniteDuration, additional: Int, minCap: Int = DefaultMajorityMinCap) extends WriteConsistency { @@ -1460,6 +1462,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // cluster joining nodes, doesn't contain selfAddress var joiningNodes: immutable.SortedSet[UniqueAddress] = immutable.SortedSet.empty + // cluster exiting nodes, doesn't contain selfAddress + var exitingNodes: immutable.SortedSet[UniqueAddress] = immutable.SortedSet.empty + // up and weaklyUp nodes, doesn't contain joining and not selfAddress private def allNodes: immutable.SortedSet[UniqueAddress] = nodes.union(weaklyUpNodes) @@ -1501,11 +1506,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // messages after loading durable data. var replyTo: ActorRef = null - private def nodesForReadWrite(): Vector[UniqueAddress] = { - if (settings.preferOldest) - membersByAge.iterator.map(_.uniqueAddress).toVector - else - nodes.toVector + private def nodesForReadWrite(excludeExiting: Boolean): Vector[UniqueAddress] = { + if (excludeExiting && exitingNodes.nonEmpty) { + if (settings.preferOldest) membersByAge.iterator.collect { + case m if !exitingNodes.contains(m.uniqueAddress) => m.uniqueAddress + }.toVector + else nodes.diff(exitingNodes).toVector + } else { + if (settings.preferOldest) membersByAge.iterator.map(_.uniqueAddress).toVector + else nodes.toVector + } } override protected[akka] def aroundReceive(rcv: Actor.Receive, msg: Any): Unit = { @@ -1661,6 +1671,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case MemberJoined(m) => receiveMemberJoining(m) case MemberWeaklyUp(m) => receiveMemberWeaklyUp(m) case MemberUp(m) => receiveMemberUp(m) + case MemberExited(m) => receiveMemberExiting(m) case MemberRemoved(m, _) => receiveMemberRemoved(m) case evt: MemberEvent => receiveOtherMemberEvent(evt.member) case UnreachableMember(m) => receiveUnreachable(m) @@ -1683,6 +1694,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } replyTo ! reply } else { + val excludeExiting = consistency match { + case _: ReadMajorityPlus | _: ReadAll => true + case _ => false + } context.actorOf( ReadAggregator .props( @@ -1690,7 +1705,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog consistency, req, selfUniqueAddress, - nodesForReadWrite(), + nodesForReadWrite(excludeExiting), unreachable, !settings.preferOldest, localValue, @@ -1783,6 +1798,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // of subsequent updates are in sync on the destination nodes. // The order is also kept when prefer-oldest is enabled. val shuffle = !(settings.preferOldest || writeDelta.exists(_.requiresCausalDeliveryOfDeltas)) + val excludeExiting = writeConsistency match { + case _: WriteMajorityPlus | _: WriteAll => true + case _ => false + } val writeAggregator = context.actorOf( WriteAggregator @@ -1793,7 +1812,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog writeConsistency, req, selfUniqueAddress, - nodesForReadWrite(), + nodesForReadWrite(excludeExiting), unreachable, shuffle, replyTo, @@ -1901,6 +1920,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog else replyTo ! DeleteSuccess(key, req) } else { + val excludeExiting = consistency match { + case _: WriteMajorityPlus | _: WriteAll => true + case _ => false + } val writeAggregator = context.actorOf( WriteAggregator @@ -1911,7 +1934,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog consistency, req, selfUniqueAddress, - nodesForReadWrite(), + nodesForReadWrite(excludeExiting), unreachable, !settings.preferOldest, replyTo, @@ -2322,6 +2345,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } } + def receiveMemberExiting(m: Member): Unit = + if (matchingRole(m) && m.address != selfAddress) + exitingNodes += m.uniqueAddress + def receiveMemberRemoved(m: Member): Unit = { if (m.address == selfAddress) context.stop(self) @@ -2332,6 +2359,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog nodes -= m.uniqueAddress weaklyUpNodes -= m.uniqueAddress joiningNodes -= m.uniqueAddress + exitingNodes -= m.uniqueAddress removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime) unreachable -= m.uniqueAddress if (settings.preferOldest) diff --git a/akka-docs/src/main/paradox/typed/distributed-data.md b/akka-docs/src/main/paradox/typed/distributed-data.md index d61a6bba08..3cb12d9fe6 100644 --- a/akka-docs/src/main/paradox/typed/distributed-data.md +++ b/akka-docs/src/main/paradox/typed/distributed-data.md @@ -229,9 +229,10 @@ at least **N/2 + 1** replicas, where N is the number of nodes in the cluster (or cluster role group) * `WriteMajorityPlus` is like `WriteMajority` but with the given number of `additional` nodes added to the majority count. At most all nodes. This gives better tolerance for membership changes between - writes and reads. - * `WriteAll` the value will immediately be written to all nodes in the cluster -(or all nodes in the cluster role group) + writes and reads. Exiting nodes are excluded using `WriteMajorityPlus` because those are typically about to be removed + and will not be able to respond. + * `WriteAll` the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group). + Exiting nodes are excluded using `WriteAll` because those are typically about to be removed and will not be able to respond. When you specify to write to `n` out of `x` nodes, the update will first replicate to `n` nodes. If there are not enough Acks after a 1/5th of the timeout, the update will be replicated to `n` other @@ -263,9 +264,10 @@ at least **N/2 + 1** replicas, where N is the number of nodes in the cluster (or cluster role group) * `ReadMajorityPlus` is like `ReadMajority` but with the given number of `additional` nodes added to the majority count. At most all nodes. This gives better tolerance for membership changes between - writes and reads. - * `ReadAll` the value will be read and merged from all nodes in the cluster -(or all nodes in the cluster role group) + writes and reads. Exiting nodes are excluded using `ReadMajorityPlus` because those are typically about to be + removed and will not be able to respond. + * `ReadAll` the value will be read and merged from all nodes in the cluster (or all nodes in the cluster role group). + Exiting nodes are excluded using `ReadAll` because those are typically about to be removed and will not be able to respond. Note that `ReadMajority` and `ReadMajorityPlus` have a `minCap` parameter that is useful to specify to achieve better safety for small clusters.