From 4ba835d3288a690739bddd16a01e9911a68c8d41 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 24 Apr 2020 14:19:53 +0200 Subject: [PATCH] Harden ShardCoordinator state replication, #28856 (#28895) * Possibility to prefer oldest in ddata writes and reads * enabled for Cluster Sharding * New ReadMajorityPlus and WriteMajorityPlus * used by Cluster Sharding, with configuration * also possible to define ReadAll in config --- .../typed/ClusterShardingSettings.scala | 26 +- .../src/main/resources/reference.conf | 21 +- .../sharding/ClusterShardingSettings.scala | 63 +++- .../cluster/sharding/ShardCoordinator.scala | 59 +++- .../sharding/ClusterShardingLeavingSpec.scala | 57 ++- .../MultiNodeClusterShardingSpec.scala | 2 +- .../protobuf/msg/ReplicatedDataMessages.java | 2 +- .../protobuf/msg/ReplicatorMessages.java | 324 ++++++++++++++---- .../issue-28856-coordinator-state.excludes | 11 + .../main/protobuf/ReplicatorMessages.proto | 2 + .../src/main/resources/reference.conf | 4 + .../scala/akka/cluster/ddata/Replicator.scala | 229 +++++++++---- .../ReplicatorMessageSerializer.scala | 37 +- .../akka/cluster/ddata/ReplicatorSpec.scala | 45 +++ .../cluster/ddata/WriteAggregatorSpec.scala | 55 ++- .../ReplicatorMessageSerializerSpec.scala | 3 + .../main/paradox/typed/distributed-data.md | 23 +- 17 files changed, 767 insertions(+), 196 deletions(-) create mode 100644 akka-distributed-data/src/main/mima-filters/2.6.4.backwards.excludes/issue-28856-coordinator-state.excludes diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index ea1173a118..825d1b8027 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -83,7 +83,9 @@ object ClusterShardingSettings { entityRecoveryConstantRateStrategyFrequency = settings.tuningParameters.entityRecoveryConstantRateStrategyFrequency, entityRecoveryConstantRateStrategyNumberOfEntities = - settings.tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities), + settings.tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities, + coordinatorStateWriteMajorityPlus = settings.tuningParameters.coordinatorStateWriteMajorityPlus, + coordinatorStateReadMajorityPlus = settings.tuningParameters.coordinatorStateReadMajorityPlus), new ClassicClusterSingletonManagerSettings( settings.coordinatorSingletonSettings.singletonName, settings.coordinatorSingletonSettings.role, @@ -125,7 +127,9 @@ object ClusterShardingSettings { val shardStartTimeout: FiniteDuration, val snapshotAfter: Int, val updatingStateTimeout: FiniteDuration, - val waitingForStateTimeout: FiniteDuration) { + val waitingForStateTimeout: FiniteDuration, + val coordinatorStateWriteMajorityPlus: Int, + val coordinatorStateReadMajorityPlus: Int) { def this(classic: ClassicShardingSettings.TuningParameters) = this( @@ -145,7 +149,9 @@ object ClusterShardingSettings { updatingStateTimeout = classic.updatingStateTimeout, entityRecoveryStrategy = classic.entityRecoveryStrategy, entityRecoveryConstantRateStrategyFrequency = classic.entityRecoveryConstantRateStrategyFrequency, - entityRecoveryConstantRateStrategyNumberOfEntities = classic.entityRecoveryConstantRateStrategyNumberOfEntities) + entityRecoveryConstantRateStrategyNumberOfEntities = classic.entityRecoveryConstantRateStrategyNumberOfEntities, + coordinatorStateWriteMajorityPlus = classic.coordinatorStateWriteMajorityPlus, + coordinatorStateReadMajorityPlus = classic.coordinatorStateReadMajorityPlus) require( entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant", @@ -185,6 +191,10 @@ object ClusterShardingSettings { def withWaitingForStateTimeout(value: FiniteDuration): TuningParameters = copy(waitingForStateTimeout = value) def withWaitingForStateTimeout(value: java.time.Duration): TuningParameters = withWaitingForStateTimeout(value.asScala) + def withCoordinatorStateWriteMajorityPlus(value: Int): TuningParameters = + copy(coordinatorStateWriteMajorityPlus = value) + def withCoordinatorStateReadMajorityPlus(value: Int): TuningParameters = + copy(coordinatorStateReadMajorityPlus = value) private def copy( bufferSize: Int = bufferSize, @@ -203,7 +213,9 @@ object ClusterShardingSettings { shardStartTimeout: FiniteDuration = shardStartTimeout, snapshotAfter: Int = snapshotAfter, updatingStateTimeout: FiniteDuration = updatingStateTimeout, - waitingForStateTimeout: FiniteDuration = waitingForStateTimeout): TuningParameters = + waitingForStateTimeout: FiniteDuration = waitingForStateTimeout, + coordinatorStateWriteMajorityPlus: Int = coordinatorStateWriteMajorityPlus, + coordinatorStateReadMajorityPlus: Int = coordinatorStateReadMajorityPlus): TuningParameters = new TuningParameters( bufferSize = bufferSize, coordinatorFailureBackoff = coordinatorFailureBackoff, @@ -221,10 +233,12 @@ object ClusterShardingSettings { shardStartTimeout = shardStartTimeout, snapshotAfter = snapshotAfter, updatingStateTimeout = updatingStateTimeout, - waitingForStateTimeout = waitingForStateTimeout) + waitingForStateTimeout = waitingForStateTimeout, + coordinatorStateWriteMajorityPlus = coordinatorStateWriteMajorityPlus, + coordinatorStateReadMajorityPlus = coordinatorStateReadMajorityPlus) override def toString = - s"""TuningParameters($bufferSize,$coordinatorFailureBackoff,$entityRecoveryConstantRateStrategyFrequency,$entityRecoveryConstantRateStrategyNumberOfEntities,$entityRecoveryStrategy,$entityRestartBackoff,$handOffTimeout,$keepNrOfBatches,$leastShardAllocationMaxSimultaneousRebalance,$leastShardAllocationRebalanceThreshold,$rebalanceInterval,$retryInterval,$shardFailureBackoff,$shardStartTimeout,$snapshotAfter,$updatingStateTimeout,$waitingForStateTimeout)""" + s"""TuningParameters($bufferSize,$coordinatorFailureBackoff,$entityRecoveryConstantRateStrategyFrequency,$entityRecoveryConstantRateStrategyNumberOfEntities,$entityRecoveryStrategy,$entityRestartBackoff,$handOffTimeout,$keepNrOfBatches,$leastShardAllocationMaxSimultaneousRebalance,$leastShardAllocationRebalanceThreshold,$rebalanceInterval,$retryInterval,$shardFailureBackoff,$shardStartTimeout,$snapshotAfter,$updatingStateTimeout,$waitingForStateTimeout,$coordinatorStateReadMajorityPlus,$coordinatorStateReadMajorityPlus)""" } } diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 0c8bd53cb2..c95406c0e9 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -148,6 +148,23 @@ akka.cluster.sharding { # be the same as "akka.cluster.sharding.role". # A lease can be configured in these settings for the coordinator singleton coordinator-singleton = ${akka.cluster.singleton} + + coordinator-state { + # State updates are required to be written to a majority of nodes plus this + # number of additional nodes. Can also be set to "all" to require + # writes to all nodes. The reason for write/read to more than majority + # is to have more tolerance for membership changes between write and read. + # The tradeoff of increasing this is that updates will be slower. + # It is more important to increase the `read-majority-plus`. + write-majority-plus = 5 + # State retrieval when ShardCoordinator is started is required to be read + # from a majority of nodes plus this number of additional nodes. Can also + # be set to "all" to require reads from all nodes. The reason for write/read + # to more than majority is to have more tolerance for membership changes between + # write and read. + # The tradeoff of increasing this is that coordinator startup will be slower. + read-majority-plus = 5 + } # Settings for the Distributed Data replicator. # Same layout as akka.cluster.distributed-data. @@ -166,7 +183,9 @@ akka.cluster.sharding { # can become to large if including to many in same message. Limit to # the same number as the number of ORSet per shard. max-delta-elements = 5 - + + # ShardCoordinator is singleton running on oldest + prefer-oldest = on } # The id of the dispatcher to use for ClusterSharding actors. diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 8d33f444fc..e5c199888d 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -33,6 +33,15 @@ object ClusterShardingSettings { * the default configuration `akka.cluster.sharding`. */ def apply(config: Config): ClusterShardingSettings = { + + def configMajorityPlus(p: String): Int = { + import akka.util.Helpers.toRootLowerCase + toRootLowerCase(config.getString(p)) match { + case "all" => Int.MaxValue + case _ => config.getInt(p) + } + } + val tuningParameters = new TuningParameters( coordinatorFailureBackoff = config.getDuration("coordinator-failure-backoff", MILLISECONDS).millis, retryInterval = config.getDuration("retry-interval", MILLISECONDS).millis, @@ -53,7 +62,9 @@ object ClusterShardingSettings { entityRecoveryConstantRateStrategyFrequency = config.getDuration("entity-recovery-constant-rate-strategy.frequency", MILLISECONDS).millis, entityRecoveryConstantRateStrategyNumberOfEntities = - config.getInt("entity-recovery-constant-rate-strategy.number-of-entities")) + config.getInt("entity-recovery-constant-rate-strategy.number-of-entities"), + coordinatorStateWriteMajorityPlus = configMajorityPlus("coordinator-state.write-majority-plus"), + coordinatorStateReadMajorityPlus = configMajorityPlus("coordinator-state.read-majority-plus")) val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton")) @@ -114,13 +125,60 @@ object ClusterShardingSettings { val updatingStateTimeout: FiniteDuration, val entityRecoveryStrategy: String, val entityRecoveryConstantRateStrategyFrequency: FiniteDuration, - val entityRecoveryConstantRateStrategyNumberOfEntities: Int) { + val entityRecoveryConstantRateStrategyNumberOfEntities: Int, + val coordinatorStateWriteMajorityPlus: Int, + val coordinatorStateReadMajorityPlus: Int) { require( entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant", s"Unknown 'entity-recovery-strategy' [$entityRecoveryStrategy], valid values are 'all' or 'constant'") // included for binary compatibility + @deprecated( + "Use the ClusterShardingSettings factory methods or the constructor including " + + "coordinatorStateWriteMajorityPlus and coordinatorStateReadMajorityPlus instead", + since = "2.6.5") + def this( + coordinatorFailureBackoff: FiniteDuration, + retryInterval: FiniteDuration, + bufferSize: Int, + handOffTimeout: FiniteDuration, + shardStartTimeout: FiniteDuration, + shardFailureBackoff: FiniteDuration, + entityRestartBackoff: FiniteDuration, + rebalanceInterval: FiniteDuration, + snapshotAfter: Int, + keepNrOfBatches: Int, + leastShardAllocationRebalanceThreshold: Int, + leastShardAllocationMaxSimultaneousRebalance: Int, + waitingForStateTimeout: FiniteDuration, + updatingStateTimeout: FiniteDuration, + entityRecoveryStrategy: String, + entityRecoveryConstantRateStrategyFrequency: FiniteDuration, + entityRecoveryConstantRateStrategyNumberOfEntities: Int) = + this( + coordinatorFailureBackoff, + retryInterval, + bufferSize, + handOffTimeout, + shardStartTimeout, + shardFailureBackoff, + entityRestartBackoff, + rebalanceInterval, + snapshotAfter, + keepNrOfBatches, + leastShardAllocationRebalanceThreshold, + leastShardAllocationMaxSimultaneousRebalance, + waitingForStateTimeout, + updatingStateTimeout, + entityRecoveryStrategy, + entityRecoveryConstantRateStrategyFrequency, + entityRecoveryConstantRateStrategyNumberOfEntities, + coordinatorStateWriteMajorityPlus = 5, + coordinatorStateReadMajorityPlus = 5) + + // included for binary compatibility + @deprecated("Use the ClusterShardingSettings factory methods or the full constructor instead", since = "2.6.5") def this( coordinatorFailureBackoff: FiniteDuration, retryInterval: FiniteDuration, @@ -159,6 +217,7 @@ object ClusterShardingSettings { } // included for binary compatibility + @deprecated("Use the ClusterShardingSettings factory methods or the full constructor instead", since = "2.6.5") def this( coordinatorFailureBackoff: FiniteDuration, retryInterval: FiniteDuration, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 24c991ed97..1541ed6c5a 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -336,6 +336,7 @@ object ShardCoordinator { @SerialVersionUID(1L) final case class ShardRegionProxyTerminated(regionProxy: ActorRef) extends DomainEvent @SerialVersionUID(1L) final case class ShardHomeAllocated(shard: ShardId, region: ActorRef) extends DomainEvent @SerialVersionUID(1L) final case class ShardHomeDeallocated(shard: ShardId) extends DomainEvent + @SerialVersionUID(1L) final case object ShardCoordinatorInitialized extends DomainEvent case object StateInitialized @@ -404,6 +405,8 @@ object ShardCoordinator { shards = shards - shard, regions = regions.updated(region, regions(region).filterNot(_ == shard)), unallocatedShards = newUnallocatedShards) + case ShardCoordinatorInitialized => + this } } @@ -752,7 +755,12 @@ abstract class ShardCoordinator( sender() ! reply case ShardCoordinator.Internal.Terminate => - log.debug("Received termination message") + if (rebalanceInProgress.isEmpty) + log.debug("Received termination message.") + else + log.debug( + "Received termination message. Rebalance in progress of shards [{}].", + rebalanceInProgress.keySet.mkString(", ")) context.stop(self) }: Receive).orElse[Any, Unit](receiveTerminated) @@ -1016,6 +1024,8 @@ class PersistentShardCoordinator( state = state.updated(evt) case _: ShardHomeDeallocated => state = state.updated(evt) + case ShardCoordinatorInitialized => + () // not used here } case SnapshotOffer(_, st: State) => @@ -1097,8 +1107,16 @@ class DDataShardCoordinator( import ShardCoordinator.Internal._ import akka.cluster.ddata.Replicator.Update - private val readMajority = ReadMajority(settings.tuningParameters.waitingForStateTimeout, majorityMinCap) - private val writeMajority = WriteMajority(settings.tuningParameters.updatingStateTimeout, majorityMinCap) + private val stateReadConsistency = settings.tuningParameters.coordinatorStateReadMajorityPlus match { + case Int.MaxValue => ReadAll(settings.tuningParameters.waitingForStateTimeout) + case additional => ReadMajorityPlus(settings.tuningParameters.waitingForStateTimeout, majorityMinCap, additional) + } + private val stateWriteConsistency = settings.tuningParameters.coordinatorStateWriteMajorityPlus match { + case Int.MaxValue => WriteAll(settings.tuningParameters.waitingForStateTimeout) + case additional => WriteMajorityPlus(settings.tuningParameters.waitingForStateTimeout, majorityMinCap, additional) + } + private val allShardsReadConsistency = ReadMajority(settings.tuningParameters.waitingForStateTimeout, majorityMinCap) + private val allShardsWriteConsistency = WriteMajority(settings.tuningParameters.updatingStateTimeout, majorityMinCap) implicit val node: Cluster = Cluster(context.system) private implicit val selfUniqueAddress: SelfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress) @@ -1128,7 +1146,8 @@ class DDataShardCoordinator( def waitingForState(remainingKeys: Set[Key[ReplicatedData]]): Receive = ({ case g @ GetSuccess(CoordinatorStateKey, _) => - state = g.get(CoordinatorStateKey).value.withRememberEntities(settings.rememberEntities) + val reg = g.get(CoordinatorStateKey) + state = reg.value.withRememberEntities(settings.rememberEntities) log.debug("Received initial coordinator state [{}]", state) val newRemainingKeys = remainingKeys - CoordinatorStateKey if (newRemainingKeys.isEmpty) @@ -1139,7 +1158,7 @@ class DDataShardCoordinator( case GetFailure(CoordinatorStateKey, _) => log.error( "The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout': {} millis (retrying). Has ClusterSharding been started on all nodes?", - readMajority.timeout.toMillis) + stateReadConsistency.timeout.toMillis) // repeat until GetSuccess getCoordinatorState() @@ -1163,7 +1182,7 @@ class DDataShardCoordinator( case GetFailure(AllShardsKey, _) => log.error( "The ShardCoordinator was unable to get all shards state within 'waiting-for-state-timeout': {} millis (retrying)", - readMajority.timeout.toMillis) + allShardsReadConsistency.timeout.toMillis) // repeat until GetSuccess getAllShards() @@ -1177,6 +1196,13 @@ class DDataShardCoordinator( case ShardCoordinator.Internal.Terminate => log.debug("Received termination message while waiting for state") context.stop(self) + + case Register(region) => + log.debug("ShardRegion tried to register but ShardCoordinator not initialized yet: [{}]", region) + + case RegisterProxy(region) => + log.debug("ShardRegion proxy tried to register but ShardCoordinator not initialized yet: [{}]", region) + }: Receive).orElse[Any, Unit](receiveTerminated) private def becomeWaitingForStateInitialized(): Unit = { @@ -1226,7 +1252,7 @@ class DDataShardCoordinator( log.error( "The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': {} millis ({}). " + "Perhaps the ShardRegion has not started on all active nodes yet? event={}", - writeMajority.timeout.toMillis, + stateWriteConsistency.timeout.toMillis, if (terminating) "terminating" else "retrying", evt) if (terminating) { @@ -1247,7 +1273,7 @@ class DDataShardCoordinator( case UpdateTimeout(AllShardsKey, Some(newShard: String)) => log.error( "The ShardCoordinator was unable to update shards distributed state within 'updating-state-timeout': {} millis ({}), event={}", - writeMajority.timeout.toMillis, + allShardsWriteConsistency.timeout.toMillis, if (terminating) "terminating" else "retrying", evt) if (terminating) { @@ -1336,25 +1362,28 @@ class DDataShardCoordinator( } def getCoordinatorState(): Unit = { - replicator ! Get(CoordinatorStateKey, readMajority) + replicator ! Get(CoordinatorStateKey, stateReadConsistency) } def getAllShards(): Unit = { if (rememberEntities) - replicator ! Get(AllShardsKey, readMajority) + replicator ! Get(AllShardsKey, allShardsReadConsistency) } def sendCoordinatorStateUpdate(evt: DomainEvent) = { val s = state.updated(evt) - log.debug("Publishing new coordinator state [{}]", state) - replicator ! Update(CoordinatorStateKey, LWWRegister(selfUniqueAddress, initEmptyState), writeMajority, Some(evt)) { - reg => - reg.withValueOf(s) + log.debug("Storing new coordinator state [{}]", state) + replicator ! Update( + CoordinatorStateKey, + LWWRegister(selfUniqueAddress, initEmptyState), + stateWriteConsistency, + Some(evt)) { reg => + reg.withValueOf(s) } } def sendAllShardsUpdate(newShard: String) = { - replicator ! Update(AllShardsKey, GSet.empty[String], writeMajority, Some(newShard))(_ + newShard) + replicator ! Update(AllShardsKey, GSet.empty[String], allShardsWriteConsistency, Some(newShard))(_ + newShard) } } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index b59718a954..44c99e421a 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -7,9 +7,12 @@ package akka.cluster.sharding import scala.concurrent.duration._ import akka.actor.{ Actor, ActorRef, Props } +import akka.cluster.MemberStatus import akka.serialization.jackson.CborSerializable import akka.testkit._ +import akka.util.ccompat._ +@ccompatUsedUntil213 object ClusterShardingLeavingSpec { case class Ping(id: String) extends CborSerializable @@ -39,11 +42,21 @@ object ClusterShardingLeavingSpec { } } -abstract class ClusterShardingLeavingSpecConfig(mode: String) extends MultiNodeClusterShardingConfig(mode) { +abstract class ClusterShardingLeavingSpecConfig(mode: String) + extends MultiNodeClusterShardingConfig( + mode, + loglevel = "INFO", + additionalConfig = """ + akka.cluster.sharding.rebalance-interval = 120 s + akka.cluster.sharding.distributed-data.majority-min-cap = 1 + akka.cluster.sharding.coordinator-state.write-majority-plus = 1 + akka.cluster.sharding.coordinator-state.read-majority-plus = 1 + """) { val first = role("first") val second = role("second") val third = role("third") val fourth = role("fourth") + val fifth = role("fifth") } @@ -60,11 +73,13 @@ class PersistentClusterShardingLeavingMultiJvmNode1 extends PersistentClusterSha class PersistentClusterShardingLeavingMultiJvmNode2 extends PersistentClusterShardingLeavingSpec class PersistentClusterShardingLeavingMultiJvmNode3 extends PersistentClusterShardingLeavingSpec class PersistentClusterShardingLeavingMultiJvmNode4 extends PersistentClusterShardingLeavingSpec +class PersistentClusterShardingLeavingMultiJvmNode5 extends PersistentClusterShardingLeavingSpec class DDataClusterShardingLeavingMultiJvmNode1 extends DDataClusterShardingLeavingSpec class DDataClusterShardingLeavingMultiJvmNode2 extends DDataClusterShardingLeavingSpec class DDataClusterShardingLeavingMultiJvmNode3 extends DDataClusterShardingLeavingSpec class DDataClusterShardingLeavingMultiJvmNode4 extends DDataClusterShardingLeavingSpec +class DDataClusterShardingLeavingMultiJvmNode5 extends DDataClusterShardingLeavingSpec abstract class ClusterShardingLeavingSpec(multiNodeConfig: ClusterShardingLeavingSpecConfig) extends MultiNodeClusterShardingSpec(multiNodeConfig) @@ -89,9 +104,16 @@ abstract class ClusterShardingLeavingSpec(multiNodeConfig: ClusterShardingLeavin startPersistenceIfNotDdataMode(startOn = first, setStoreOn = roles) join(first, first, onJoinedRunOnFrom = startSharding()) - join(second, first, onJoinedRunOnFrom = startSharding()) - join(third, first, onJoinedRunOnFrom = startSharding()) - join(fourth, first, onJoinedRunOnFrom = startSharding()) + join(second, first, onJoinedRunOnFrom = startSharding(), assertNodeUp = false) + join(third, first, onJoinedRunOnFrom = startSharding(), assertNodeUp = false) + join(fourth, first, onJoinedRunOnFrom = startSharding(), assertNodeUp = false) + join(fifth, first, onJoinedRunOnFrom = startSharding(), assertNodeUp = false) + + // all Up, everywhere before continuing + awaitAssert { + cluster.state.members.size should ===(roles.size) + cluster.state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up)) + } enterBarrier("after-2") } @@ -105,6 +127,7 @@ abstract class ClusterShardingLeavingSpec(multiNodeConfig: ClusterShardingLeavin id -> expectMsgType[ActorRef] }).toMap shardLocations ! Locations(locations) + system.log.debug("Original locations: {}", locations) } enterBarrier("after-3") } @@ -112,28 +135,36 @@ abstract class ClusterShardingLeavingSpec(multiNodeConfig: ClusterShardingLeavin "recover after leaving coordinator node" in { system.actorSelection(node(first) / "user" / "shardLocations") ! GetLocations val Locations(originalLocations) = expectMsgType[Locations] - val firstAddress = node(first).address - runOn(third) { - cluster.leave(node(first).address) + val numberOfNodesLeaving = 2 + val leavingRoles = roles.take(numberOfNodesLeaving) + val leavingNodes = leavingRoles.map(address) + val remainingRoles = roles.drop(numberOfNodesLeaving) + + runOn(roles.last) { + leavingNodes.foreach { a => + cluster.leave(a) + } } - runOn(first) { + runOn(leavingRoles: _*) { watch(region) expectTerminated(region, 15.seconds) } - enterBarrier("stopped") + // more stress by not having the barrier here - runOn(second, third, fourth) { + runOn(remainingRoles: _*) { within(15.seconds) { awaitAssert { val probe = TestProbe() originalLocations.foreach { case (id, ref) => region.tell(Ping(id), probe.ref) - if (ref.path.address == firstAddress) - probe.expectMsgType[ActorRef](1.second) should not be (ref) - else + if (leavingNodes.contains(ref.path.address)) { + val newRef = probe.expectMsgType[ActorRef](1.second) + newRef should not be (ref) + system.log.debug("Moved [{}] from [{}] to [{}]", id, ref, newRef) + } else probe.expectMsg(1.second, ref) // should not move } } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala index dceebb54d5..b8559b4668 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala @@ -130,7 +130,7 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding if (assertNodeUp) { within(max) { awaitAssert { - cluster.state.isMemberUp(node(from).address) + cluster.state.isMemberUp(node(from).address) should ===(true) } } } diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java index 4eefcf5d62..9f520d7be4 100644 --- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java +++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatedDataMessages.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2020 Lightbend Inc. + * Copyright (C) 2020 Lightbend Inc. */ // Generated by the protocol buffer compiler. DO NOT EDIT! diff --git a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java index 507ac11f0b..bd4fa05077 100644 --- a/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java +++ b/akka-distributed-data/src/main/java/akka/cluster/ddata/protobuf/msg/ReplicatorMessages.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2020 Lightbend Inc. + * Copyright (C) 2020 Lightbend Inc. */ // Generated by the protocol buffer compiler. DO NOT EDIT! @@ -73,6 +73,28 @@ public final class ReplicatorMessages { * optional .akka.cluster.ddata.OtherMessage request = 4; */ akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessageOrBuilder getRequestOrBuilder(); + + /** + * optional int32 consistencyMinCap = 5; + * @return Whether the consistencyMinCap field is set. + */ + boolean hasConsistencyMinCap(); + /** + * optional int32 consistencyMinCap = 5; + * @return The consistencyMinCap. + */ + int getConsistencyMinCap(); + + /** + * optional int32 consistencyAdditional = 6; + * @return Whether the consistencyAdditional field is set. + */ + boolean hasConsistencyAdditional(); + /** + * optional int32 consistencyAdditional = 6; + * @return The consistencyAdditional. + */ + int getConsistencyAdditional(); } /** * Protobuf type {@code akka.cluster.ddata.Get} @@ -156,6 +178,16 @@ public final class ReplicatorMessages { bitField0_ |= 0x00000008; break; } + case 40: { + bitField0_ |= 0x00000010; + consistencyMinCap_ = input.readInt32(); + break; + } + case 48: { + bitField0_ |= 0x00000020; + consistencyAdditional_ = input.readInt32(); + break; + } default: { if (!parseUnknownField( input, unknownFields, extensionRegistry, tag)) { @@ -269,6 +301,40 @@ public final class ReplicatorMessages { return request_ == null ? akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage.getDefaultInstance() : request_; } + public static final int CONSISTENCYMINCAP_FIELD_NUMBER = 5; + private int consistencyMinCap_; + /** + * optional int32 consistencyMinCap = 5; + * @return Whether the consistencyMinCap field is set. + */ + public boolean hasConsistencyMinCap() { + return ((bitField0_ & 0x00000010) != 0); + } + /** + * optional int32 consistencyMinCap = 5; + * @return The consistencyMinCap. + */ + public int getConsistencyMinCap() { + return consistencyMinCap_; + } + + public static final int CONSISTENCYADDITIONAL_FIELD_NUMBER = 6; + private int consistencyAdditional_; + /** + * optional int32 consistencyAdditional = 6; + * @return Whether the consistencyAdditional field is set. + */ + public boolean hasConsistencyAdditional() { + return ((bitField0_ & 0x00000020) != 0); + } + /** + * optional int32 consistencyAdditional = 6; + * @return The consistencyAdditional. + */ + public int getConsistencyAdditional() { + return consistencyAdditional_; + } + private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -317,6 +383,12 @@ public final class ReplicatorMessages { if (((bitField0_ & 0x00000008) != 0)) { output.writeMessage(4, getRequest()); } + if (((bitField0_ & 0x00000010) != 0)) { + output.writeInt32(5, consistencyMinCap_); + } + if (((bitField0_ & 0x00000020) != 0)) { + output.writeInt32(6, consistencyAdditional_); + } unknownFields.writeTo(output); } @@ -342,6 +414,14 @@ public final class ReplicatorMessages { size += akka.protobufv3.internal.CodedOutputStream .computeMessageSize(4, getRequest()); } + if (((bitField0_ & 0x00000010) != 0)) { + size += akka.protobufv3.internal.CodedOutputStream + .computeInt32Size(5, consistencyMinCap_); + } + if (((bitField0_ & 0x00000020) != 0)) { + size += akka.protobufv3.internal.CodedOutputStream + .computeInt32Size(6, consistencyAdditional_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -377,6 +457,16 @@ public final class ReplicatorMessages { if (!getRequest() .equals(other.getRequest())) return false; } + if (hasConsistencyMinCap() != other.hasConsistencyMinCap()) return false; + if (hasConsistencyMinCap()) { + if (getConsistencyMinCap() + != other.getConsistencyMinCap()) return false; + } + if (hasConsistencyAdditional() != other.hasConsistencyAdditional()) return false; + if (hasConsistencyAdditional()) { + if (getConsistencyAdditional() + != other.getConsistencyAdditional()) return false; + } if (!unknownFields.equals(other.unknownFields)) return false; return true; } @@ -404,6 +494,14 @@ public final class ReplicatorMessages { hash = (37 * hash) + REQUEST_FIELD_NUMBER; hash = (53 * hash) + getRequest().hashCode(); } + if (hasConsistencyMinCap()) { + hash = (37 * hash) + CONSISTENCYMINCAP_FIELD_NUMBER; + hash = (53 * hash) + getConsistencyMinCap(); + } + if (hasConsistencyAdditional()) { + hash = (37 * hash) + CONSISTENCYADDITIONAL_FIELD_NUMBER; + hash = (53 * hash) + getConsistencyAdditional(); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -555,6 +653,10 @@ public final class ReplicatorMessages { requestBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000008); + consistencyMinCap_ = 0; + bitField0_ = (bitField0_ & ~0x00000010); + consistencyAdditional_ = 0; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -607,6 +709,14 @@ public final class ReplicatorMessages { } to_bitField0_ |= 0x00000008; } + if (((from_bitField0_ & 0x00000010) != 0)) { + result.consistencyMinCap_ = consistencyMinCap_; + to_bitField0_ |= 0x00000010; + } + if (((from_bitField0_ & 0x00000020) != 0)) { + result.consistencyAdditional_ = consistencyAdditional_; + to_bitField0_ |= 0x00000020; + } result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -668,6 +778,12 @@ public final class ReplicatorMessages { if (other.hasRequest()) { mergeRequest(other.getRequest()); } + if (other.hasConsistencyMinCap()) { + setConsistencyMinCap(other.getConsistencyMinCap()); + } + if (other.hasConsistencyAdditional()) { + setConsistencyAdditional(other.getConsistencyAdditional()); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -1028,6 +1144,80 @@ public final class ReplicatorMessages { } return requestBuilder_; } + + private int consistencyMinCap_ ; + /** + * optional int32 consistencyMinCap = 5; + * @return Whether the consistencyMinCap field is set. + */ + public boolean hasConsistencyMinCap() { + return ((bitField0_ & 0x00000010) != 0); + } + /** + * optional int32 consistencyMinCap = 5; + * @return The consistencyMinCap. + */ + public int getConsistencyMinCap() { + return consistencyMinCap_; + } + /** + * optional int32 consistencyMinCap = 5; + * @param value The consistencyMinCap to set. + * @return This builder for chaining. + */ + public Builder setConsistencyMinCap(int value) { + bitField0_ |= 0x00000010; + consistencyMinCap_ = value; + onChanged(); + return this; + } + /** + * optional int32 consistencyMinCap = 5; + * @return This builder for chaining. + */ + public Builder clearConsistencyMinCap() { + bitField0_ = (bitField0_ & ~0x00000010); + consistencyMinCap_ = 0; + onChanged(); + return this; + } + + private int consistencyAdditional_ ; + /** + * optional int32 consistencyAdditional = 6; + * @return Whether the consistencyAdditional field is set. + */ + public boolean hasConsistencyAdditional() { + return ((bitField0_ & 0x00000020) != 0); + } + /** + * optional int32 consistencyAdditional = 6; + * @return The consistencyAdditional. + */ + public int getConsistencyAdditional() { + return consistencyAdditional_; + } + /** + * optional int32 consistencyAdditional = 6; + * @param value The consistencyAdditional to set. + * @return This builder for chaining. + */ + public Builder setConsistencyAdditional(int value) { + bitField0_ |= 0x00000020; + consistencyAdditional_ = value; + onChanged(); + return this; + } + /** + * optional int32 consistencyAdditional = 6; + * @return This builder for chaining. + */ + public Builder clearConsistencyAdditional() { + bitField0_ = (bitField0_ & ~0x00000020); + consistencyAdditional_ = 0; + onChanged(); + return this; + } @java.lang.Override public final Builder setUnknownFields( final akka.protobufv3.internal.UnknownFieldSet unknownFields) { @@ -23709,73 +23899,75 @@ public final class ReplicatorMessages { static { java.lang.String[] descriptorData = { "\n\030ReplicatorMessages.proto\022\022akka.cluster" + - ".ddata\"\215\001\n\003Get\022-\n\003key\030\001 \002(\0132 .akka.clust" + + ".ddata\"\307\001\n\003Get\022-\n\003key\030\001 \002(\0132 .akka.clust" + "er.ddata.OtherMessage\022\023\n\013consistency\030\002 \002" + "(\021\022\017\n\007timeout\030\003 \002(\r\0221\n\007request\030\004 \001(\0132 .a" + - "kka.cluster.ddata.OtherMessage\"\236\001\n\nGetSu" + - "ccess\022-\n\003key\030\001 \002(\0132 .akka.cluster.ddata." + - "OtherMessage\022.\n\004data\030\002 \002(\0132 .akka.cluste" + - "r.ddata.OtherMessage\0221\n\007request\030\004 \001(\0132 ." + - "akka.cluster.ddata.OtherMessage\"l\n\010NotFo" + - "und\022-\n\003key\030\001 \002(\0132 .akka.cluster.ddata.Ot" + - "herMessage\0221\n\007request\030\002 \001(\0132 .akka.clust" + - "er.ddata.OtherMessage\"n\n\nGetFailure\022-\n\003k" + - "ey\030\001 \002(\0132 .akka.cluster.ddata.OtherMessa" + - "ge\0221\n\007request\030\002 \001(\0132 .akka.cluster.ddata" + - ".OtherMessage\"G\n\tSubscribe\022-\n\003key\030\001 \002(\0132" + - " .akka.cluster.ddata.OtherMessage\022\013\n\003ref" + - "\030\002 \002(\t\"I\n\013Unsubscribe\022-\n\003key\030\001 \002(\0132 .akk" + - "a.cluster.ddata.OtherMessage\022\013\n\003ref\030\002 \002(" + - "\t\"h\n\007Changed\022-\n\003key\030\001 \002(\0132 .akka.cluster" + - ".ddata.OtherMessage\022.\n\004data\030\002 \002(\0132 .akka" + - ".cluster.ddata.OtherMessage\"}\n\005Write\022\013\n\003" + + "kka.cluster.ddata.OtherMessage\022\031\n\021consis" + + "tencyMinCap\030\005 \001(\005\022\035\n\025consistencyAddition" + + "al\030\006 \001(\005\"\236\001\n\nGetSuccess\022-\n\003key\030\001 \002(\0132 .a" + + "kka.cluster.ddata.OtherMessage\022.\n\004data\030\002" + + " \002(\0132 .akka.cluster.ddata.OtherMessage\0221" + + "\n\007request\030\004 \001(\0132 .akka.cluster.ddata.Oth" + + "erMessage\"l\n\010NotFound\022-\n\003key\030\001 \002(\0132 .akk" + + "a.cluster.ddata.OtherMessage\0221\n\007request\030" + + "\002 \001(\0132 .akka.cluster.ddata.OtherMessage\"" + + "n\n\nGetFailure\022-\n\003key\030\001 \002(\0132 .akka.cluste" + + "r.ddata.OtherMessage\0221\n\007request\030\002 \001(\0132 ." + + "akka.cluster.ddata.OtherMessage\"G\n\tSubsc" + + "ribe\022-\n\003key\030\001 \002(\0132 .akka.cluster.ddata.O" + + "therMessage\022\013\n\003ref\030\002 \002(\t\"I\n\013Unsubscribe\022" + + "-\n\003key\030\001 \002(\0132 .akka.cluster.ddata.OtherM" + + "essage\022\013\n\003ref\030\002 \002(\t\"h\n\007Changed\022-\n\003key\030\001 " + + "\002(\0132 .akka.cluster.ddata.OtherMessage\022.\n" + + "\004data\030\002 \002(\0132 .akka.cluster.ddata.OtherMe" + + "ssage\"}\n\005Write\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030" + + "\002 \002(\0132 .akka.cluster.ddata.DataEnvelope\022" + + "3\n\010fromNode\030\003 \001(\0132!.akka.cluster.ddata.U" + + "niqueAddress\"\007\n\005Empty\"H\n\004Read\022\013\n\003key\030\001 \002" + + "(\t\0223\n\010fromNode\030\002 \001(\0132!.akka.cluster.ddat" + + "a.UniqueAddress\"@\n\nReadResult\0222\n\010envelop" + + "e\030\001 \001(\0132 .akka.cluster.ddata.DataEnvelop" + + "e\"\221\003\n\014DataEnvelope\022.\n\004data\030\001 \002(\0132 .akka." + + "cluster.ddata.OtherMessage\022>\n\007pruning\030\002 " + + "\003(\0132-.akka.cluster.ddata.DataEnvelope.Pr" + + "uningEntry\0228\n\rdeltaVersions\030\003 \001(\0132!.akka" + + ".cluster.ddata.VersionVector\032\326\001\n\014Pruning" + + "Entry\0229\n\016removedAddress\030\001 \002(\0132!.akka.clu" + + "ster.ddata.UniqueAddress\0227\n\014ownerAddress" + + "\030\002 \002(\0132!.akka.cluster.ddata.UniqueAddres" + + "s\022\021\n\tperformed\030\003 \002(\010\022)\n\004seen\030\004 \003(\0132\033.akk" + + "a.cluster.ddata.Address\022\024\n\014obsoleteTime\030" + + "\005 \001(\022\"\257\001\n\006Status\022\r\n\005chunk\030\001 \002(\r\022\021\n\ttotCh" + + "unks\030\002 \002(\r\0221\n\007entries\030\003 \003(\0132 .akka.clust" + + "er.ddata.Status.Entry\022\023\n\013toSystemUid\030\004 \001" + + "(\020\022\025\n\rfromSystemUid\030\005 \001(\020\032$\n\005Entry\022\013\n\003ke" + + "y\030\001 \002(\t\022\016\n\006digest\030\002 \002(\014\"\303\001\n\006Gossip\022\020\n\010se" + + "ndBack\030\001 \002(\010\0221\n\007entries\030\002 \003(\0132 .akka.clu" + + "ster.ddata.Gossip.Entry\022\023\n\013toSystemUid\030\003" + + " \001(\020\022\025\n\rfromSystemUid\030\004 \001(\020\032H\n\005Entry\022\013\n\003" + "key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka.clust" + - "er.ddata.DataEnvelope\0223\n\010fromNode\030\003 \001(\0132" + - "!.akka.cluster.ddata.UniqueAddress\"\007\n\005Em" + - "pty\"H\n\004Read\022\013\n\003key\030\001 \002(\t\0223\n\010fromNode\030\002 \001" + - "(\0132!.akka.cluster.ddata.UniqueAddress\"@\n" + - "\nReadResult\0222\n\010envelope\030\001 \001(\0132 .akka.clu" + - "ster.ddata.DataEnvelope\"\221\003\n\014DataEnvelope" + - "\022.\n\004data\030\001 \002(\0132 .akka.cluster.ddata.Othe" + - "rMessage\022>\n\007pruning\030\002 \003(\0132-.akka.cluster" + - ".ddata.DataEnvelope.PruningEntry\0228\n\rdelt" + - "aVersions\030\003 \001(\0132!.akka.cluster.ddata.Ver" + - "sionVector\032\326\001\n\014PruningEntry\0229\n\016removedAd" + - "dress\030\001 \002(\0132!.akka.cluster.ddata.UniqueA" + - "ddress\0227\n\014ownerAddress\030\002 \002(\0132!.akka.clus" + - "ter.ddata.UniqueAddress\022\021\n\tperformed\030\003 \002" + - "(\010\022)\n\004seen\030\004 \003(\0132\033.akka.cluster.ddata.Ad" + - "dress\022\024\n\014obsoleteTime\030\005 \001(\022\"\257\001\n\006Status\022\r" + - "\n\005chunk\030\001 \002(\r\022\021\n\ttotChunks\030\002 \002(\r\0221\n\007entr" + - "ies\030\003 \003(\0132 .akka.cluster.ddata.Status.En" + - "try\022\023\n\013toSystemUid\030\004 \001(\020\022\025\n\rfromSystemUi" + - "d\030\005 \001(\020\032$\n\005Entry\022\013\n\003key\030\001 \002(\t\022\016\n\006digest\030" + - "\002 \002(\014\"\303\001\n\006Gossip\022\020\n\010sendBack\030\001 \002(\010\0221\n\007en" + - "tries\030\002 \003(\0132 .akka.cluster.ddata.Gossip." + - "Entry\022\023\n\013toSystemUid\030\003 \001(\020\022\025\n\rfromSystem" + - "Uid\030\004 \001(\020\032H\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010envel" + - "ope\030\002 \002(\0132 .akka.cluster.ddata.DataEnvel" + - "ope\"\201\002\n\020DeltaPropagation\0223\n\010fromNode\030\001 \002" + - "(\0132!.akka.cluster.ddata.UniqueAddress\022;\n" + - "\007entries\030\002 \003(\0132*.akka.cluster.ddata.Delt" + - "aPropagation.Entry\022\r\n\005reply\030\003 \001(\010\032l\n\005Ent" + - "ry\022\013\n\003key\030\001 \002(\t\0222\n\010envelope\030\002 \002(\0132 .akka" + - ".cluster.ddata.DataEnvelope\022\021\n\tfromSeqNr" + - "\030\003 \002(\003\022\017\n\007toSeqNr\030\004 \001(\003\"X\n\rUniqueAddress" + - "\022,\n\007address\030\001 \002(\0132\033.akka.cluster.ddata.A" + - "ddress\022\013\n\003uid\030\002 \002(\017\022\014\n\004uid2\030\003 \001(\017\")\n\007Add" + - "ress\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"\224\001\n" + - "\rVersionVector\0228\n\007entries\030\001 \003(\0132\'.akka.c" + - "luster.ddata.VersionVector.Entry\032I\n\005Entr" + - "y\022/\n\004node\030\001 \002(\0132!.akka.cluster.ddata.Uni" + - "queAddress\022\017\n\007version\030\002 \002(\003\"V\n\014OtherMess" + - "age\022\027\n\017enclosedMessage\030\001 \002(\014\022\024\n\014serializ" + - "erId\030\002 \002(\005\022\027\n\017messageManifest\030\004 \001(\014\"\036\n\nS" + - "tringGSet\022\020\n\010elements\030\001 \003(\t\"\205\001\n\023DurableD" + - "ataEnvelope\022.\n\004data\030\001 \002(\0132 .akka.cluster" + - ".ddata.OtherMessage\022>\n\007pruning\030\002 \003(\0132-.a" + - "kka.cluster.ddata.DataEnvelope.PruningEn" + - "tryB#\n\037akka.cluster.ddata.protobuf.msgH\001" + "er.ddata.DataEnvelope\"\201\002\n\020DeltaPropagati" + + "on\0223\n\010fromNode\030\001 \002(\0132!.akka.cluster.ddat" + + "a.UniqueAddress\022;\n\007entries\030\002 \003(\0132*.akka." + + "cluster.ddata.DeltaPropagation.Entry\022\r\n\005" + + "reply\030\003 \001(\010\032l\n\005Entry\022\013\n\003key\030\001 \002(\t\0222\n\010env" + + "elope\030\002 \002(\0132 .akka.cluster.ddata.DataEnv" + + "elope\022\021\n\tfromSeqNr\030\003 \002(\003\022\017\n\007toSeqNr\030\004 \001(" + + "\003\"X\n\rUniqueAddress\022,\n\007address\030\001 \002(\0132\033.ak" + + "ka.cluster.ddata.Address\022\013\n\003uid\030\002 \002(\017\022\014\n" + + "\004uid2\030\003 \001(\017\")\n\007Address\022\020\n\010hostname\030\001 \002(\t" + + "\022\014\n\004port\030\002 \002(\r\"\224\001\n\rVersionVector\0228\n\007entr" + + "ies\030\001 \003(\0132\'.akka.cluster.ddata.VersionVe" + + "ctor.Entry\032I\n\005Entry\022/\n\004node\030\001 \002(\0132!.akka" + + ".cluster.ddata.UniqueAddress\022\017\n\007version\030" + + "\002 \002(\003\"V\n\014OtherMessage\022\027\n\017enclosedMessage" + + "\030\001 \002(\014\022\024\n\014serializerId\030\002 \002(\005\022\027\n\017messageM" + + "anifest\030\004 \001(\014\"\036\n\nStringGSet\022\020\n\010elements\030" + + "\001 \003(\t\"\205\001\n\023DurableDataEnvelope\022.\n\004data\030\001 " + + "\002(\0132 .akka.cluster.ddata.OtherMessage\022>\n" + + "\007pruning\030\002 \003(\0132-.akka.cluster.ddata.Data" + + "Envelope.PruningEntryB#\n\037akka.cluster.dd" + + "ata.protobuf.msgH\001" }; descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, @@ -23786,7 +23978,7 @@ public final class ReplicatorMessages { internal_static_akka_cluster_ddata_Get_fieldAccessorTable = new akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_akka_cluster_ddata_Get_descriptor, - new java.lang.String[] { "Key", "Consistency", "Timeout", "Request", }); + new java.lang.String[] { "Key", "Consistency", "Timeout", "Request", "ConsistencyMinCap", "ConsistencyAdditional", }); internal_static_akka_cluster_ddata_GetSuccess_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_akka_cluster_ddata_GetSuccess_fieldAccessorTable = new diff --git a/akka-distributed-data/src/main/mima-filters/2.6.4.backwards.excludes/issue-28856-coordinator-state.excludes b/akka-distributed-data/src/main/mima-filters/2.6.4.backwards.excludes/issue-28856-coordinator-state.excludes new file mode 100644 index 0000000000..5e95e1d862 --- /dev/null +++ b/akka-distributed-data/src/main/mima-filters/2.6.4.backwards.excludes/issue-28856-coordinator-state.excludes @@ -0,0 +1,11 @@ +# #28856 internal changes to Replicator +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.nodes") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.Replicator.nodes_=") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.weaklyUpNodes") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.Replicator.weaklyUpNodes_=") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.joiningNodes") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.ddata.Replicator.joiningNodes_=") +ProblemFilters.exclude[Problem]("akka.cluster.ddata.ReadWriteAggregator.*") +ProblemFilters.exclude[Problem]("akka.cluster.ddata.ReadAggregator*") +ProblemFilters.exclude[Problem]("akka.cluster.ddata.WriteAggregator.*") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatorMessages*") diff --git a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto index 1c29ad16e5..b224bbce09 100644 --- a/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto +++ b/akka-distributed-data/src/main/protobuf/ReplicatorMessages.proto @@ -14,6 +14,8 @@ message Get { required sint32 consistency = 2; required uint32 timeout = 3; optional OtherMessage request = 4; + optional int32 consistencyMinCap = 5; + optional int32 consistencyAdditional = 6; } message GetSuccess { diff --git a/akka-distributed-data/src/main/resources/reference.conf b/akka-distributed-data/src/main/resources/reference.conf index 502b48ce27..58a5e7efbc 100644 --- a/akka-distributed-data/src/main/resources/reference.conf +++ b/akka-distributed-data/src/main/resources/reference.conf @@ -56,6 +56,10 @@ akka.cluster.distributed-data { # several nodes. If no further activity they are removed from the cache # after this duration. serializer-cache-time-to-live = 10s + + # Update and Get operations are sent to oldest nodes first. + # This is useful together with Cluster Singleton, which is running on oldest nodes. + prefer-oldest = off # Settings for delta-CRDT delta-crdt { diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 7043cc1ddc..e58adaf5ce 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -64,7 +64,6 @@ import scala.annotation.varargs import akka.event.Logging import akka.util.JavaDurationConverters._ import akka.util.ccompat._ - import com.github.ghik.silencer.silent @ccompatUsedUntil213 @@ -91,7 +90,7 @@ object ReplicatorSettings { import akka.util.ccompat.JavaConverters._ new ReplicatorSettings( - role = roleOption(config.getString("role")), + roles = roleOption(config.getString("role")).toSet, gossipInterval = config.getDuration("gossip-interval", MILLISECONDS).millis, notifySubscribersInterval = config.getDuration("notify-subscribers-interval", MILLISECONDS).millis, maxDeltaElements = config.getInt("max-delta-elements"), @@ -103,7 +102,8 @@ object ReplicatorSettings { pruningMarkerTimeToLive = config.getDuration("pruning-marker-time-to-live", MILLISECONDS).millis, durablePruningMarkerTimeToLive = config.getDuration("durable.pruning-marker-time-to-live", MILLISECONDS).millis, deltaCrdtEnabled = config.getBoolean("delta-crdt.enabled"), - maxDeltaSize = config.getInt("delta-crdt.max-delta-size")) + maxDeltaSize = config.getInt("delta-crdt.max-delta-size"), + preferOldest = config.getBoolean("prefer-oldest")) } /** @@ -147,6 +147,7 @@ object ReplicatorSettings { * @param durableKeys Keys that are durable. Prefix matching is supported by using * `*` at the end of a key. All entries can be made durable by including "*" * in the `Set`. + * @param preferOldest Update and Get operations are sent to oldest nodes first. */ final class ReplicatorSettings( val roles: Set[String], @@ -161,7 +162,39 @@ final class ReplicatorSettings( val pruningMarkerTimeToLive: FiniteDuration, val durablePruningMarkerTimeToLive: FiniteDuration, val deltaCrdtEnabled: Boolean, - val maxDeltaSize: Int) { + val maxDeltaSize: Int, + val preferOldest: Boolean) { + + // for backwards compatibility + def this( + roles: Set[String], + gossipInterval: FiniteDuration, + notifySubscribersInterval: FiniteDuration, + maxDeltaElements: Int, + dispatcher: String, + pruningInterval: FiniteDuration, + maxPruningDissemination: FiniteDuration, + durableStoreProps: Either[(String, Config), Props], + durableKeys: Set[KeyId], + pruningMarkerTimeToLive: FiniteDuration, + durablePruningMarkerTimeToLive: FiniteDuration, + deltaCrdtEnabled: Boolean, + maxDeltaSize: Int) = + this( + roles, + gossipInterval, + notifySubscribersInterval, + maxDeltaElements, + dispatcher, + pruningInterval, + maxPruningDissemination, + durableStoreProps, + durableKeys, + pruningMarkerTimeToLive, + durablePruningMarkerTimeToLive, + deltaCrdtEnabled, + maxDeltaSize, + preferOldest = false) // for backwards compatibility def this( @@ -179,7 +212,7 @@ final class ReplicatorSettings( deltaCrdtEnabled: Boolean, maxDeltaSize: Int) = this( - role.iterator.toSet, + role.toSet, gossipInterval, notifySubscribersInterval, maxDeltaElements, @@ -203,7 +236,7 @@ final class ReplicatorSettings( pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) = this( - roles = role.iterator.toSet, + roles = role.toSet, gossipInterval, notifySubscribersInterval, maxDeltaElements, @@ -272,9 +305,9 @@ final class ReplicatorSettings( deltaCrdtEnabled, 200) - def withRole(role: String): ReplicatorSettings = copy(roles = ReplicatorSettings.roleOption(role).iterator.toSet) + def withRole(role: String): ReplicatorSettings = copy(roles = ReplicatorSettings.roleOption(role).toSet) - def withRole(role: Option[String]): ReplicatorSettings = copy(roles = role.iterator.toSet) + def withRole(role: Option[String]): ReplicatorSettings = copy(roles = role.toSet) @varargs def withRoles(roles: String*): ReplicatorSettings = copy(roles = roles.toSet) @@ -337,6 +370,9 @@ final class ReplicatorSettings( def withMaxDeltaSize(maxDeltaSize: Int): ReplicatorSettings = copy(maxDeltaSize = maxDeltaSize) + def withPreferOldest(preferOldest: Boolean): ReplicatorSettings = + copy(preferOldest = preferOldest) + private def copy( roles: Set[String] = roles, gossipInterval: FiniteDuration = gossipInterval, @@ -350,7 +386,8 @@ final class ReplicatorSettings( pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive, durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive, deltaCrdtEnabled: Boolean = deltaCrdtEnabled, - maxDeltaSize: Int = maxDeltaSize): ReplicatorSettings = + maxDeltaSize: Int = maxDeltaSize, + preferOldest: Boolean = preferOldest): ReplicatorSettings = new ReplicatorSettings( roles, gossipInterval, @@ -364,7 +401,8 @@ final class ReplicatorSettings( pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled, - maxDeltaSize) + maxDeltaSize, + preferOldest) } object Replicator { @@ -403,6 +441,19 @@ object Replicator { */ def this(timeout: java.time.Duration) = this(timeout.asScala, DefaultMajorityMinCap) } + + /** + * `ReadMajority` but with the given number of `additional` nodes added to the majority count. At most + * all nodes. + */ + final case class ReadMajorityPlus(timeout: FiniteDuration, additional: Int, minCap: Int = DefaultMajorityMinCap) + extends ReadConsistency { + + /** + * Java API + */ + def this(timeout: java.time.Duration, additional: Int) = this(timeout.asScala, additional, DefaultMajorityMinCap) + } final case class ReadAll(timeout: FiniteDuration) extends ReadConsistency { /** @@ -434,6 +485,19 @@ object Replicator { */ def this(timeout: java.time.Duration) = this(timeout.asScala, DefaultMajorityMinCap) } + + /** + * `WriteMajority` but with the given number of `additional` nodes added to the majority count. At most + * all nodes. + */ + final case class WriteMajorityPlus(timeout: FiniteDuration, additional: Int, minCap: Int = DefaultMajorityMinCap) + extends WriteConsistency { + + /** + * Java API + */ + def this(timeout: java.time.Duration, additional: Int) = this(timeout.asScala, additional, DefaultMajorityMinCap) + } final case class WriteAll(timeout: FiniteDuration) extends WriteConsistency { /** @@ -1008,7 +1072,9 @@ object Replicator { extends ReplicatorMessage with DestinationSystemUid - final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long) + final case class Delta(dataEnvelope: DataEnvelope, fromSeqNr: Long, toSeqNr: Long) { + def requiresCausalDeliveryOfDeltas: Boolean = dataEnvelope.data.isInstanceOf[RequiresCausalDeliveryOfDeltas] + } final case class DeltaPropagation(_fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]) extends ReplicatorMessage with SendingSystemUid { @@ -1288,8 +1354,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val deltaPropagationSelector = new DeltaPropagationSelector { override val gossipIntervalDivisor = 5 override def allNodes: Vector[UniqueAddress] = { - // TODO optimize, by maintaining a sorted instance variable instead - Replicator.this.allNodes.diff(unreachable).toVector.sorted + // Replicator.allNodes is sorted + Replicator.this.allNodes.diff(unreachable).toVector } override def maxDeltaSize: Int = settings.maxDeltaSize @@ -1321,16 +1387,20 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } else None // cluster nodes, doesn't contain selfAddress, doesn't contain joining and weaklyUp - var nodes: Set[UniqueAddress] = Set.empty + var nodes: immutable.SortedSet[UniqueAddress] = immutable.SortedSet.empty + + // cluster members sorted by age, oldest first,, doesn't contain selfAddress, doesn't contain joining and weaklyUp + // only used when prefer-oldest is enabled + var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(Member.ageOrdering) // cluster weaklyUp nodes, doesn't contain selfAddress - var weaklyUpNodes: Set[UniqueAddress] = Set.empty + var weaklyUpNodes: immutable.SortedSet[UniqueAddress] = immutable.SortedSet.empty // cluster joining nodes, doesn't contain selfAddress - var joiningNodes: Set[UniqueAddress] = Set.empty + var joiningNodes: immutable.SortedSet[UniqueAddress] = immutable.SortedSet.empty // up and weaklyUp nodes, doesn't contain joining and not selfAddress - private def allNodes: Set[UniqueAddress] = nodes.union(weaklyUpNodes) + private def allNodes: immutable.SortedSet[UniqueAddress] = nodes.union(weaklyUpNodes) private def isKnownNode(node: UniqueAddress): Boolean = nodes(node) || weaklyUpNodes(node) || joiningNodes(node) || selfUniqueAddress == node @@ -1370,6 +1440,13 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog // messages after loading durable data. var replyTo: ActorRef = null + private def nodesForReadWrite(): Vector[UniqueAddress] = { + if (settings.preferOldest) + membersByAge.iterator.map(_.uniqueAddress).toVector + else + nodes.toVector + } + override protected[akka] def aroundReceive(rcv: Actor.Receive, msg: Any): Unit = { replyTo = sender() try { @@ -1547,7 +1624,16 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } else { context.actorOf( ReadAggregator - .props(key, consistency, req, selfUniqueAddress, nodes, unreachable, localValue, replyTo) + .props( + key, + consistency, + req, + selfUniqueAddress, + nodesForReadWrite(), + unreachable, + !settings.preferOldest, + localValue, + replyTo) .withDispatcher(context.props.dispatcher)) } } @@ -1632,6 +1718,10 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case Some(d) => (newEnvelope.copy(data = d), None) case None => (newEnvelope, None) } + // When RequiresCausalDeliveryOfDeltas use deterministic order to so that sequence numbers + // of subsequent updates are in sync on the destination nodes. + // The order is also kept when prefer-oldest is enabled. + val shuffle = !(settings.preferOldest || writeDelta.exists(_.requiresCausalDeliveryOfDeltas)) val writeAggregator = context.actorOf( WriteAggregator @@ -1642,8 +1732,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog writeConsistency, req, selfUniqueAddress, - nodes, + nodesForReadWrite(), unreachable, + shuffle, replyTo, durable) .withDispatcher(context.props.dispatcher)) @@ -1758,8 +1849,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog consistency, req, selfUniqueAddress, - nodes, + nodesForReadWrite(), unreachable, + !settings.preferOldest, replyTo, durable) .withDispatcher(context.props.dispatcher)) @@ -2106,6 +2198,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog nodes += m.uniqueAddress weaklyUpNodes -= m.uniqueAddress joiningNodes -= m.uniqueAddress + if (settings.preferOldest) + membersByAge += m } } @@ -2121,6 +2215,8 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog joiningNodes -= m.uniqueAddress removedNodes = removedNodes.updated(m.uniqueAddress, allReachableClockTime) unreachable -= m.uniqueAddress + if (settings.preferOldest) + membersByAge -= m deltaPropagationSelector.cleanupRemovedNode(m.uniqueAddress) } } @@ -2260,14 +2356,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog case object SendToSecondary val MaxSecondaryNodes = 10 - def calculateMajorityWithMinCap(minCap: Int, numberOfNodes: Int): Int = { - if (numberOfNodes <= minCap) { - numberOfNodes - } else { - val majority = numberOfNodes / 2 + 1 - if (majority <= minCap) minCap - else majority - } + def calculateMajority(minCap: Int, numberOfNodes: Int, additional: Int): Int = { + val majority = numberOfNodes / 2 + 1 + math.min(numberOfNodes, math.max(majority + additional, minCap)) } } @@ -2278,30 +2369,33 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog import ReadWriteAggregator._ def timeout: FiniteDuration - def nodes: Set[UniqueAddress] + def nodes: Vector[UniqueAddress] def unreachable: Set[UniqueAddress] - def reachableNodes: Set[UniqueAddress] = nodes.diff(unreachable) + def reachableNodes: Vector[UniqueAddress] = nodes.filterNot(unreachable) + def shuffle: Boolean import context.dispatcher var sendToSecondarySchedule = context.system.scheduler.scheduleOnce(timeout / 5, self, SendToSecondary) var timeoutSchedule = context.system.scheduler.scheduleOnce(timeout, self, ReceiveTimeout) - var remaining = nodes.map(_.address) + var remaining = nodes.iterator.map(_.address).toSet def doneWhenRemainingSize: Int - def primaryAndSecondaryNodes( - requiresCausalDeliveryOfDeltas: Boolean): (Vector[UniqueAddress], Vector[UniqueAddress]) = { + def primaryAndSecondaryNodes(): (Vector[UniqueAddress], Vector[UniqueAddress]) = { val primarySize = nodes.size - doneWhenRemainingSize if (primarySize >= nodes.size) - (nodes.toVector, Vector.empty[UniqueAddress]) + (nodes, Vector.empty[UniqueAddress]) else { // Prefer to use reachable nodes over the unreachable nodes first. - // When RequiresCausalDeliveryOfDeltas use deterministic order to so that sequence numbers of subsequent - // updates are in sync on the destination nodes. + // When RequiresCausalDeliveryOfDeltas (shuffle=false) use deterministic order to so that sequence numbers + // of subsequent updates are in sync on the destination nodes. + // The order is also kept when prefer-oldest is enabled. val orderedNodes = - if (requiresCausalDeliveryOfDeltas) reachableNodes.toVector.sorted ++ unreachable.toVector.sorted - else scala.util.Random.shuffle(reachableNodes.toVector) ++ scala.util.Random.shuffle(unreachable.toVector) + if (shuffle) + scala.util.Random.shuffle(reachableNodes) ++ scala.util.Random.shuffle(unreachable.toVector) + else + reachableNodes ++ unreachable val (p, s) = orderedNodes.splitAt(primarySize) (p, s.take(MaxSecondaryNodes)) } @@ -2328,8 +2422,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog consistency: Replicator.WriteConsistency, req: Option[Any], selfUniqueAddress: UniqueAddress, - nodes: Set[UniqueAddress], + nodes: Vector[UniqueAddress], unreachable: Set[UniqueAddress], + shuffle: Boolean, replyTo: ActorRef, durable: Boolean): Props = Props( @@ -2342,6 +2437,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog selfUniqueAddress, nodes, unreachable, + shuffle, replyTo, durable)).withDeploy(Deploy.local) } @@ -2356,8 +2452,9 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog consistency: Replicator.WriteConsistency, req: Option[Any], selfUniqueAddress: UniqueAddress, - override val nodes: Set[UniqueAddress], + override val nodes: Vector[UniqueAddress], override val unreachable: Set[UniqueAddress], + override val shuffle: Boolean, replyTo: ActorRef, durable: Boolean) extends ReadWriteAggregator @@ -2370,11 +2467,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog override def timeout: FiniteDuration = consistency.timeout override val doneWhenRemainingSize = consistency match { - case WriteTo(n, _) => nodes.size - (n - 1) - case _: WriteAll => 0 + case WriteTo(n, _) => nodes.size - (n - 1) + case _: WriteAll => 0 case WriteMajority(_, minCap) => + // +1 because local node is not included in `nodes` val N = nodes.size + 1 - val w = calculateMajorityWithMinCap(minCap, N) + val w = calculateMajority(minCap, N, 0) + log.debug("WriteMajority [{}] [{}] of [{}].", key, w, N) + N - w + case WriteMajorityPlus(_, additional, minCap) => + // +1 because local node is not included in `nodes` + val N = nodes.size + 1 + val w = calculateMajority(minCap, N, additional) + log.debug("WriteMajorityPlus [{}] [{}] of [{}].", key, w, N) N - w case WriteLocal => throw new IllegalArgumentException("WriteLocal not supported by WriteAggregator") @@ -2389,13 +2494,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog var gotLocalStoreReply = !durable var gotWriteNackFrom = Set.empty[Address] - private val (primaryNodes, secondaryNodes) = { - val requiresCausalDeliveryOfDeltas = delta match { - case None => false - case Some(d) => d.dataEnvelope.data.isInstanceOf[RequiresCausalDeliveryOfDeltas] - } - primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas) - } + private val (primaryNodes, secondaryNodes) = primaryAndSecondaryNodes() override def preStart(): Unit = { val msg = deltaMsg match { @@ -2479,11 +2578,13 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog consistency: Replicator.ReadConsistency, req: Option[Any], selfUniqueAddress: UniqueAddress, - nodes: Set[UniqueAddress], + nodes: Vector[UniqueAddress], unreachable: Set[UniqueAddress], + shuffle: Boolean, localValue: Option[Replicator.Internal.DataEnvelope], replyTo: ActorRef): Props = - Props(new ReadAggregator(key, consistency, req, selfUniqueAddress, nodes, unreachable, localValue, replyTo)) + Props( + new ReadAggregator(key, consistency, req, selfUniqueAddress, nodes, unreachable, shuffle, localValue, replyTo)) .withDeploy(Deploy.local) } @@ -2496,11 +2597,13 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog consistency: Replicator.ReadConsistency, req: Option[Any], selfUniqueAddress: UniqueAddress, - override val nodes: Set[UniqueAddress], + override val nodes: Vector[UniqueAddress], override val unreachable: Set[UniqueAddress], + override val shuffle: Boolean, localValue: Option[Replicator.Internal.DataEnvelope], replyTo: ActorRef) - extends ReadWriteAggregator { + extends ReadWriteAggregator + with ActorLogging { import Replicator._ import Replicator.Internal._ @@ -2510,11 +2613,19 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog var result = localValue override val doneWhenRemainingSize = consistency match { - case ReadFrom(n, _) => nodes.size - (n - 1) - case _: ReadAll => 0 + case ReadFrom(n, _) => nodes.size - (n - 1) + case _: ReadAll => 0 case ReadMajority(_, minCap) => + // +1 because local node is not included in `nodes` val N = nodes.size + 1 - val r = calculateMajorityWithMinCap(minCap, N) + val r = calculateMajority(minCap, N, 0) + log.debug("ReadMajority [{}] [{}] of [{}].", key, r, N) + N - r + case ReadMajorityPlus(_, additional, minCap) => + // +1 because local node is not included in `nodes` + val N = nodes.size + 1 + val r = calculateMajority(minCap, N, additional) + log.debug("ReadMajorityPlus [{}] [{}] of [{}].", key, r, N) N - r case ReadLocal => throw new IllegalArgumentException("ReadLocal not supported by ReadAggregator") @@ -2522,9 +2633,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog val readMsg = Read(key.id, Some(selfUniqueAddress)) - private val (primaryNodes, secondaryNodes) = { - primaryAndSecondaryNodes(requiresCausalDeliveryOfDeltas = false) - } + private val (primaryNodes, secondaryNodes) = primaryAndSecondaryNodes() override def preStart(): Unit = { primaryNodes.foreach { replica(_) ! readMsg } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala index db7a6251e1..21ff8c5209 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala @@ -338,21 +338,27 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) } private def getToProto(get: Get[_]): dm.Get = { - val consistencyValue = get.consistency match { - case ReadLocal => 1 - case ReadFrom(n, _) => n - case _: ReadMajority => 0 - case _: ReadAll => -1 - } - val timoutInMillis = get.consistency.timeout.toMillis require(timoutInMillis <= 0XFFFFFFFFL, "Timeouts must fit in a 32-bit unsigned int") - val b = dm.Get - .newBuilder() - .setKey(otherMessageToProto(get.key)) - .setConsistency(consistencyValue) - .setTimeout(timoutInMillis.toInt) + val b = dm.Get.newBuilder().setKey(otherMessageToProto(get.key)).setTimeout(timoutInMillis.toInt) + + get.consistency match { + case ReadLocal => b.setConsistency(1) + case ReadFrom(n, _) => b.setConsistency(n) + case ReadMajority(_, minCap) => + b.setConsistency(0) + if (minCap != 0) + b.setConsistencyMinCap(minCap) + case ReadMajorityPlus(_, additional, minCap) => + b.setConsistency(0) + if (minCap != 0) + b.setConsistencyMinCap(minCap) + if (additional != 0) + b.setConsistencyAdditional(additional) + case _: ReadAll => + b.setConsistency(-1) + } get.request.foreach(o => b.setRequest(otherMessageToProto(o))) b.build() @@ -367,8 +373,13 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) val timeout = if (get.getTimeout < 0) Duration(Int.MaxValue.toLong + (get.getTimeout - Int.MaxValue), TimeUnit.MILLISECONDS) else Duration(get.getTimeout.toLong, TimeUnit.MILLISECONDS) + def minCap = if (get.hasConsistencyMinCap) get.getConsistencyMinCap else 0 val consistency = get.getConsistency match { - case 0 => ReadMajority(timeout) + case 0 => + if (get.hasConsistencyAdditional) + ReadMajorityPlus(timeout, get.getConsistencyAdditional, minCap) + else + ReadMajority(timeout, minCap) case -1 => ReadAll(timeout) case 1 => ReadLocal case n => ReadFrom(n, timeout) diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala index ba81b0a3d9..257c22e917 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/ReplicatorSpec.scala @@ -67,6 +67,7 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec val KeyH = ORMapKey[String, Flag]("H") val KeyI = GSetKey[String]("I") val KeyJ = GSetKey[String]("J") + val KeyK = LWWRegisterKey[String]("K") val KeyX = GCounterKey("X") val KeyY = GCounterKey("Y") val KeyZ = GCounterKey("Z") @@ -591,4 +592,48 @@ class ReplicatorSpec extends MultiNodeSpec(ReplicatorSpec) with STMultiNodeSpec enterBarrierAfterTestStep() } + "support prefer oldest members" in { + // disable gossip and delta replication to only verify the write and read operations + val oldestReplicator = system.actorOf( + Replicator.props( + ReplicatorSettings(system).withPreferOldest(true).withGossipInterval(1.minute).withDeltaCrdtEnabled(false)), + "oldestReplicator") + within(5.seconds) { + val countProbe = TestProbe() + awaitAssert { + oldestReplicator.tell(GetReplicaCount, countProbe.ref) + countProbe.expectMsg(ReplicaCount(3)) + } + } + enterBarrier("oldest-replicator-started") + + val probe = TestProbe() + + runOn(second) { + oldestReplicator.tell( + Update(KeyK, LWWRegister(selfUniqueAddress, "0"), writeTwo)(_.withValue(selfUniqueAddress, "1")), + probe.ref) + probe.expectMsg(UpdateSuccess(KeyK, None)) + } + enterBarrier("updated-1") + + runOn(first) { + // replicated to oldest + oldestReplicator.tell(Get(KeyK, ReadLocal), probe.ref) + probe.expectMsgType[GetSuccess[LWWRegister[String]]].dataValue.value should ===("1") + } + + runOn(third) { + // not replicated to third (not among the two oldest) + oldestReplicator.tell(Get(KeyK, ReadLocal), probe.ref) + probe.expectMsg(NotFound(KeyK, None)) + + // read from oldest + oldestReplicator.tell(Get(KeyK, readTwo), probe.ref) + probe.expectMsgType[GetSuccess[LWWRegister[String]]].dataValue.value should ===("1") + } + + enterBarrierAfterTestStep() + } + } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala index 97ebeca23f..b17ed3794f 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/WriteAggregatorSpec.scala @@ -31,7 +31,7 @@ object WriteAggregatorSpec { consistency: Replicator.WriteConsistency, probes: Map[UniqueAddress, ActorRef], selfUniqueAddress: UniqueAddress, - nodes: Set[UniqueAddress], + nodes: Vector[UniqueAddress], unreachable: Set[UniqueAddress], replyTo: ActorRef, durable: Boolean): Props = @@ -54,7 +54,7 @@ object WriteAggregatorSpec { consistency: Replicator.WriteConsistency, probes: Map[UniqueAddress, ActorRef], selfUniqueAddress: UniqueAddress, - nodes: Set[UniqueAddress], + nodes: Vector[UniqueAddress], unreachable: Set[UniqueAddress], replyTo: ActorRef, durable: Boolean): Props = @@ -78,7 +78,7 @@ object WriteAggregatorSpec { consistency: Replicator.WriteConsistency, probes: Map[UniqueAddress, ActorRef], selfUniqueAddress: UniqueAddress, - nodes: Set[UniqueAddress], + nodes: Vector[UniqueAddress], unreachable: Set[UniqueAddress], replyTo: ActorRef, durable: Boolean) @@ -91,6 +91,7 @@ object WriteAggregatorSpec { selfUniqueAddress, nodes, unreachable, + shuffle = false, replyTo, durable) { @@ -148,7 +149,7 @@ class WriteAggregatorSpec extends AkkaSpec(s""" val nodeC = UniqueAddress(Address(protocol, "Sys", "c", 2552), 17L) val nodeD = UniqueAddress(Address(protocol, "Sys", "d", 2552), 17L) // 4 replicas + the local => 5 - val nodes = Set(nodeA, nodeB, nodeC, nodeD) + val nodes = Vector(nodeA, nodeB, nodeC, nodeD) val data = GSet.empty + "A" + "B" val timeout = 3.seconds.dilated @@ -256,16 +257,42 @@ class WriteAggregatorSpec extends AkkaSpec(s""" import ReadWriteAggregator._ - calculateMajorityWithMinCap(minCap, 3) should be(3) - calculateMajorityWithMinCap(minCap, 4) should be(4) - calculateMajorityWithMinCap(minCap, 5) should be(5) - calculateMajorityWithMinCap(minCap, 6) should be(5) - calculateMajorityWithMinCap(minCap, 7) should be(5) - calculateMajorityWithMinCap(minCap, 8) should be(5) - calculateMajorityWithMinCap(minCap, 9) should be(5) - calculateMajorityWithMinCap(minCap, 10) should be(6) - calculateMajorityWithMinCap(minCap, 11) should be(6) - calculateMajorityWithMinCap(minCap, 12) should be(7) + calculateMajority(minCap, 3, 0) should be(3) + calculateMajority(minCap, 4, 0) should be(4) + calculateMajority(minCap, 5, 0) should be(5) + calculateMajority(minCap, 6, 0) should be(5) + calculateMajority(minCap, 7, 0) should be(5) + calculateMajority(minCap, 8, 0) should be(5) + calculateMajority(minCap, 9, 0) should be(5) + calculateMajority(minCap, 10, 0) should be(6) + calculateMajority(minCap, 11, 0) should be(6) + calculateMajority(minCap, 12, 0) should be(7) + } + + "calculate majority with additional" in { + import ReadWriteAggregator._ + + calculateMajority(0, 3, 1) should be(3) + calculateMajority(0, 3, 2) should be(3) + calculateMajority(0, 4, 1) should be(4) + calculateMajority(0, 5, 1) should be(4) + calculateMajority(0, 5, 2) should be(5) + calculateMajority(0, 6, 1) should be(5) + calculateMajority(0, 7, 1) should be(5) + calculateMajority(0, 8, 1) should be(6) + calculateMajority(0, 8, 2) should be(7) + calculateMajority(0, 9, 1) should be(6) + calculateMajority(0, 10, 1) should be(7) + calculateMajority(0, 11, 1) should be(7) + calculateMajority(0, 11, 3) should be(9) + } + + "calculate majority with additional and minCap" in { + import ReadWriteAggregator._ + + calculateMajority(5, 9, 1) should be(6) + calculateMajority(7, 9, 1) should be(7) + calculateMajority(10, 9, 1) should be(9) } } diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala index c59f914373..b2884b8bd5 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala @@ -76,6 +76,7 @@ class ReplicatorMessageSerializerSpec checkSerialization(Get(keyA, ReadLocal)) checkSerialization(Get(keyA, ReadMajority(2.seconds), Some("x"))) checkSerialization(Get(keyA, ReadMajority((Int.MaxValue.toLong + 50).milliseconds), Some("x"))) + checkSerialization(Get(keyA, ReadMajority(2.seconds, minCap = 3), Some("x"))) try { serializer.toBinary(Get(keyA, ReadMajority((Int.MaxValue.toLong * 3).milliseconds), Some("x"))) fail("Our protobuf protocol does not support timeouts larger than unsigned ints") @@ -83,6 +84,8 @@ class ReplicatorMessageSerializerSpec case e: IllegalArgumentException => e.getMessage should include("unsigned int") } + checkSerialization(Get(keyA, ReadMajorityPlus(2.seconds, 3), Some("x"))) + checkSerialization(Get(keyA, ReadMajorityPlus(2.seconds, 3, 5), Some("x"))) checkSerialization(GetSuccess(keyA, None)(data1)) checkSerialization(GetSuccess(keyA, Some("x"))(data1)) checkSerialization(NotFound(keyA, Some("x"))) diff --git a/akka-docs/src/main/paradox/typed/distributed-data.md b/akka-docs/src/main/paradox/typed/distributed-data.md index bb821f7b84..573f289816 100644 --- a/akka-docs/src/main/paradox/typed/distributed-data.md +++ b/akka-docs/src/main/paradox/typed/distributed-data.md @@ -224,6 +224,9 @@ including the local replica * `WriteMajority` the value will immediately be written to a majority of replicas, i.e. at least **N/2 + 1** replicas, where N is the number of nodes in the cluster (or cluster role group) + * `WriteMajorityPlus` is like `WriteMajority` but with the given number of `additional` nodes added + to the majority count. At most all nodes. This gives better tolerance for membership changes between + writes and reads. * `WriteAll` the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group) @@ -232,7 +235,8 @@ If there are not enough Acks after a 1/5th of the timeout, the update will be re nodes. If there are less than n nodes left all of the remaining nodes are used. Reachable nodes are preferred over unreachable nodes. -Note that `WriteMajority` has a `minCap` parameter that is useful to specify to achieve better safety for small clusters. +Note that `WriteMajority` and `WriteMajorityPlus` have a `minCap` parameter that is useful to specify to +achieve better safety for small clusters. #### Read consistency @@ -254,10 +258,14 @@ including the local replica * `ReadMajority` the value will be read and merged from a majority of replicas, i.e. at least **N/2 + 1** replicas, where N is the number of nodes in the cluster (or cluster role group) +* `ReadMajorityPlus` is like `ReadMajority` but with the given number of `additional` nodes added + to the majority count. At most all nodes. This gives better tolerance for membership changes between + writes and reads. * `ReadAll` the value will be read and merged from all nodes in the cluster (or all nodes in the cluster role group) -Note that `ReadMajority` has a `minCap` parameter that is useful to specify to achieve better safety for small clusters. +Note that `ReadMajority` and `ReadMajorityPlus` have a `minCap` parameter that is useful to specify to achieve +better safety for small clusters. #### Consistency and response types @@ -305,7 +313,8 @@ read stale data if the cluster membership has changed between the `Update` and t For example, in cluster of 5 nodes when you `Update` and that change is written to 3 nodes: n1, n2, n3. Then 2 more nodes are added and a `Get` request is reading from 4 nodes, which happens to be n4, n5, n6, n7, i.e. the value on n1, n2, n3 is not seen in the response of the -`Get` request. +`Get` request. For additional tolerance of membership changes between writes and reads you can +use `WriteMajorityPlus` and `ReadMajorityPlus`. @@@ @@ -515,7 +524,13 @@ This means that the timestamp is increased for changes on the same node that occ the same millisecond. It also means that it is safe to use the `LWWRegister` without synchronized clocks when there is only one active writer, e.g. a Cluster Singleton. Such a single writer should then first read current value with `ReadMajority` (or more) before -changing and writing the value with `WriteMajority` (or more). +changing and writing the value with `WriteMajority` (or more). When using `LWWRegister` +with Cluster Singleton it's also recommended to enable: + +``` +# Update and Get operations are sent to oldest nodes first. +akka.cluster.distributed-data.prefer-oldest = on +``` ### Delta-CRDT