From 32e6a59363aad99ec20c9193adbaa9eb8cc126c9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 9 May 2017 14:29:39 +0200 Subject: [PATCH] Start shards after full cluster restart, #22868 * when using remember entities with ddata mode the set of shards were not saved in durable storage and therefore the remembered entities were not loaded until the first message was sent to the shard * the coordinator stores the set of shards in a durable GSet * loaded when the coordinator is started and added to the State, rest is already taken care of via the unallocatedShards Set in the State * when new shards are allocated the durable GSet is updated if it doesn't already contain the shard identifier --- .../cluster/sharding/ShardCoordinator.scala | 164 +++++++++++++++--- .../ClusterShardingRememberEntitiesSpec.scala | 25 ++- project/MiMa.scala | 8 + 3 files changed, 167 insertions(+), 30 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index b75ce81b70..031a5c51a8 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -22,6 +22,10 @@ import akka.dispatch.ExecutionContexts import akka.pattern.{ AskTimeoutException, pipe } import akka.persistence._ import akka.cluster.ClusterEvent +import akka.cluster.ddata.GSet +import akka.cluster.ddata.GSetKey +import akka.cluster.ddata.Key +import akka.cluster.ddata.ReplicatedData /** * @see [[ClusterSharding$ ClusterSharding extension]] @@ -46,7 +50,7 @@ object ShardCoordinator { allocationStrategy: ShardAllocationStrategy, replicator: ActorRef, majorityMinCap: Int): Props = Props(new DDataShardCoordinator(typeName: String, settings, allocationStrategy, replicator, - majorityMinCap)).withDeploy(Deploy.local) + majorityMinCap, settings.rememberEntities)).withDeploy(Deploy.local) /** * Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]]. @@ -274,6 +278,11 @@ object ShardCoordinator { copy(unallocatedShards = Set.empty, rememberEntities = enabled) } + def isEmpty: Boolean = + shards.isEmpty && regions.isEmpty && regionProxies.isEmpty + + def allShards: Set[ShardId] = shards.keySet union unallocatedShards + def updated(event: DomainEvent): State = event match { case ShardRegionRegistered(region) ⇒ require(!regions.contains(region), s"Region $region already registered: $this") @@ -857,7 +866,8 @@ class PersistentShardCoordinator(typeName: String, settings: ClusterShardingSett class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy, replicator: ActorRef, - majorityMinCap: Int) + majorityMinCap: Int, + rememberEntities: Boolean) extends ShardCoordinator(typeName, settings, allocationStrategy) with Stash { import ShardCoordinator.Internal._ import akka.cluster.ddata.Replicator.Update @@ -871,34 +881,83 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, val CoordinatorStateKey = LWWRegisterKey[State](s"${typeName}CoordinatorState") val initEmptyState = State.empty.withRememberEntities(settings.rememberEntities) + val AllShardsKey = GSetKey[String](s"shard-${typeName}-all") + val allKeys: Set[Key[ReplicatedData]] = + if (rememberEntities) Set(CoordinatorStateKey, AllShardsKey) else Set(CoordinatorStateKey) + + var shards = Set.empty[String] + if (rememberEntities) + replicator ! Subscribe(AllShardsKey, self) + node.subscribe(self, ClusterEvent.InitialStateAsEvents, ClusterShuttingDown.getClass) // get state from ddata replicator, repeat until GetSuccess - getState() + getCoordinatorState() + getAllShards() - override def receive: Receive = waitingForState + override def receive: Receive = waitingForState(allKeys) // This state will drop all other messages since they will be retried - def waitingForState: Receive = ({ + def waitingForState(remainingKeys: Set[Key[ReplicatedData]]): Receive = ({ case g @ GetSuccess(CoordinatorStateKey, _) ⇒ state = g.get(CoordinatorStateKey).value.withRememberEntities(settings.rememberEntities) - context.become(waitingForStateInitialized) - // note that watchStateActors may call update - watchStateActors() + val newRemainingKeys = remainingKeys - CoordinatorStateKey + if (newRemainingKeys.isEmpty) + becomeWaitingForStateInitialized() + else + context.become(waitingForState(newRemainingKeys)) case GetFailure(CoordinatorStateKey, _) ⇒ log.error( - "The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout' (was retrying): {} millis", + "The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout': {} millis (retrying)", readMajority.timeout.toMillis) // repeat until GetSuccess - getState() + getCoordinatorState() case NotFound(CoordinatorStateKey, _) ⇒ - // empty state, activate immediately - activate() + val newRemainingKeys = remainingKeys - CoordinatorStateKey + if (newRemainingKeys.isEmpty) + becomeWaitingForStateInitialized() + else + context.become(waitingForState(newRemainingKeys)) + + case g @ GetSuccess(AllShardsKey, _) ⇒ + shards = g.get(AllShardsKey).elements + val newUnallocatedShards = state.unallocatedShards union (shards diff state.shards.keySet) + state = state.copy(unallocatedShards = newUnallocatedShards) + val newRemainingKeys = remainingKeys - AllShardsKey + if (newRemainingKeys.isEmpty) + becomeWaitingForStateInitialized() + else + context.become(waitingForState(newRemainingKeys)) + + case GetFailure(AllShardsKey, _) ⇒ + log.error( + "The ShardCoordinator was unable to get all shards state within 'waiting-for-state-timeout': {} millis (retrying)", + readMajority.timeout.toMillis) + // repeat until GetSuccess + getAllShards() + + case NotFound(AllShardsKey, _) ⇒ + val newRemainingKeys = remainingKeys - AllShardsKey + if (newRemainingKeys.isEmpty) + becomeWaitingForStateInitialized() + else + context.become(waitingForState(newRemainingKeys)) }: Receive).orElse[Any, Unit](receiveTerminated) + private def becomeWaitingForStateInitialized(): Unit = { + if (state.isEmpty) { + // empty state, activate immediately + activate() + } else { + context.become(waitingForStateInitialized) + // note that watchStateActors may call update + watchStateActors() + } + } + // this state will stash all messages until it receives StateInitialized, // which was scheduled by previous watchStateActors def waitingForStateInitialized: Receive = { @@ -911,50 +970,99 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, } // this state will stash all messages until it receives UpdateSuccess - def waitingForUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E ⇒ Unit): Receive = { + def waitingForUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E ⇒ Unit, + remainingKeys: Set[Key[ReplicatedData]]): Receive = { case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) ⇒ log.debug("The coordinator state was successfully updated with {}", evt) - context.unbecome() - afterUpdateCallback(evt) - unstashAll() + val newRemainingKeys = remainingKeys - CoordinatorStateKey + if (newRemainingKeys.isEmpty) + unbecomeAfterUpdate(evt, afterUpdateCallback) + else + context.become(waitingForUpdate(evt, afterUpdateCallback, newRemainingKeys)) case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) ⇒ log.error( - "The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout'={} millis (was retrying), event={}", - writeMajority.timeout.toMillis, - evt) + "The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': {} millis (retrying), event={}", + writeMajority.timeout.toMillis, evt) // repeat until UpdateSuccess - sendUpdate(evt) + sendCoordinatorStateUpdate(evt) - case ModifyFailure(CoordinatorStateKey, error, cause, Some(`evt`)) ⇒ + case UpdateSuccess(AllShardsKey, Some(newShard: String)) ⇒ + log.debug("The coordinator shards state was successfully updated with {}", newShard) + val newRemainingKeys = remainingKeys - AllShardsKey + if (newRemainingKeys.isEmpty) + unbecomeAfterUpdate(evt, afterUpdateCallback) + else + context.become(waitingForUpdate(evt, afterUpdateCallback, newRemainingKeys)) + + case UpdateTimeout(AllShardsKey, Some(newShard: String)) ⇒ + log.error( + "The ShardCoordinator was unable to update shards distributed state within 'updating-state-timeout': {} millis (retrying), event={}", + writeMajority.timeout.toMillis, evt) + // repeat until UpdateSuccess + sendAllShardsUpdate(newShard) + + case ModifyFailure(key, error, cause, _) ⇒ log.error( cause, - "The ShardCoordinator was unable to update a distributed state with error {} and event {}.Coordinator will be restarted", - error, - evt) + "The ShardCoordinator was unable to update a distributed state {} with error {} and event {}.Coordinator will be restarted", + key, error, evt) throw cause case _ ⇒ stash() } + private def unbecomeAfterUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E ⇒ Unit): Unit = { + context.unbecome() + afterUpdateCallback(evt) + unstashAll() + } + def activate() = { context.become(active) log.info("Sharding Coordinator was moved to the active state {}", state) } + override def active: Receive = + if (rememberEntities) { + ({ + case chg @ Changed(AllShardsKey) ⇒ + shards = chg.get(AllShardsKey).elements + }: Receive).orElse[Any, Unit](super.active) + } else + super.active + def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit = { - context.become(waitingForUpdate(evt, f), discardOld = false) - sendUpdate(evt) + sendCoordinatorStateUpdate(evt) + evt match { + case s: ShardHomeAllocated if rememberEntities && !shards(s.shard) ⇒ + sendAllShardsUpdate(s.shard) + context.become(waitingForUpdate(evt, f, allKeys), discardOld = false) + case _ ⇒ + // no update of shards, already known + context.become(waitingForUpdate(evt, f, Set(CoordinatorStateKey)), discardOld = false) + } + } - def getState(): Unit = + def getCoordinatorState(): Unit = { replicator ! Get(CoordinatorStateKey, readMajority) + } - def sendUpdate(evt: DomainEvent) = { + def getAllShards(): Unit = { + if (rememberEntities) + replicator ! Get(AllShardsKey, readMajority) + } + + def sendCoordinatorStateUpdate(evt: DomainEvent) = { val s = state.updated(evt) replicator ! Update(CoordinatorStateKey, LWWRegister(initEmptyState), writeMajority, Some(evt)) { reg ⇒ reg.withValue(s) } } + def sendAllShardsUpdate(newShard: String) = { + replicator ! Update(AllShardsKey, GSet.empty[String], writeMajority, Some(newShard))(_ + newShard) + } + } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index ad4a9f6db3..0516958a9f 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -72,6 +72,13 @@ abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String) exten map-size = 10 MiB } """)) + + nodeConfig(first, second)(ConfigFactory.parseString(s""" + akka.cluster.sharding.distributed-data.durable.lmdb { + # use same directory for first and second node (not used at same time) + dir = target/ShardingRememberEntitiesSpec/sharding-first-second + } + """)) } object PersistentClusterShardingRememberEntitiesSpecConfig extends ClusterShardingRememberEntitiesSpecConfig( @@ -133,7 +140,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData - s"Cluster with min-nr-of-members using sharding ($mode)" must { + s"Cluster sharding with remember entities ($mode)" must { if (!isDdataMode) { "setup shared journal" in { @@ -144,7 +151,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb } enterBarrier("peristence-started") - runOn(second, third) { + runOn(first, second, third) { system.actorSelection(node(first) / "user" / "store") ! Identify(None) val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get SharedLeveldbJournal.setStore(sharedStore, system) @@ -195,6 +202,20 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb enterBarrier("after-2") } + "start remembered entities in new cluster" in within(30.seconds) { + runOn(first) { + testConductor.exit(third, 0).await + } + enterBarrier("crash-third") + + // no nodes left of the original cluster, start a new cluster + join(first, first) + runOn(first) { + startSharding() + expectMsgType[Started] + } + enterBarrier("after-3") + } } } diff --git a/project/MiMa.scala b/project/MiMa.scala index e210a74806..4780056ad1 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1192,6 +1192,14 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.async") ), "2.5.1" -> Seq( + + // #22868 store shards + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.sendUpdate"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForUpdate"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.getState"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForState"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.this"), + // #21213 Feature request: Let BackoffSupervisor reply to messages when its child is stopped ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffSupervisor.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.copy"),