diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index b75ce81b70..031a5c51a8 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -22,6 +22,10 @@ import akka.dispatch.ExecutionContexts import akka.pattern.{ AskTimeoutException, pipe } import akka.persistence._ import akka.cluster.ClusterEvent +import akka.cluster.ddata.GSet +import akka.cluster.ddata.GSetKey +import akka.cluster.ddata.Key +import akka.cluster.ddata.ReplicatedData /** * @see [[ClusterSharding$ ClusterSharding extension]] @@ -46,7 +50,7 @@ object ShardCoordinator { allocationStrategy: ShardAllocationStrategy, replicator: ActorRef, majorityMinCap: Int): Props = Props(new DDataShardCoordinator(typeName: String, settings, allocationStrategy, replicator, - majorityMinCap)).withDeploy(Deploy.local) + majorityMinCap, settings.rememberEntities)).withDeploy(Deploy.local) /** * Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]]. @@ -274,6 +278,11 @@ object ShardCoordinator { copy(unallocatedShards = Set.empty, rememberEntities = enabled) } + def isEmpty: Boolean = + shards.isEmpty && regions.isEmpty && regionProxies.isEmpty + + def allShards: Set[ShardId] = shards.keySet union unallocatedShards + def updated(event: DomainEvent): State = event match { case ShardRegionRegistered(region) ⇒ require(!regions.contains(region), s"Region $region already registered: $this") @@ -857,7 +866,8 @@ class PersistentShardCoordinator(typeName: String, settings: ClusterShardingSett class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy, replicator: ActorRef, - majorityMinCap: Int) + majorityMinCap: Int, + rememberEntities: Boolean) extends ShardCoordinator(typeName, settings, allocationStrategy) with Stash { import ShardCoordinator.Internal._ import akka.cluster.ddata.Replicator.Update @@ -871,34 +881,83 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, val CoordinatorStateKey = LWWRegisterKey[State](s"${typeName}CoordinatorState") val initEmptyState = State.empty.withRememberEntities(settings.rememberEntities) + val AllShardsKey = GSetKey[String](s"shard-${typeName}-all") + val allKeys: Set[Key[ReplicatedData]] = + if (rememberEntities) Set(CoordinatorStateKey, AllShardsKey) else Set(CoordinatorStateKey) + + var shards = Set.empty[String] + if (rememberEntities) + replicator ! Subscribe(AllShardsKey, self) + node.subscribe(self, ClusterEvent.InitialStateAsEvents, ClusterShuttingDown.getClass) // get state from ddata replicator, repeat until GetSuccess - getState() + getCoordinatorState() + getAllShards() - override def receive: Receive = waitingForState + override def receive: Receive = waitingForState(allKeys) // This state will drop all other messages since they will be retried - def waitingForState: Receive = ({ + def waitingForState(remainingKeys: Set[Key[ReplicatedData]]): Receive = ({ case g @ GetSuccess(CoordinatorStateKey, _) ⇒ state = g.get(CoordinatorStateKey).value.withRememberEntities(settings.rememberEntities) - context.become(waitingForStateInitialized) - // note that watchStateActors may call update - watchStateActors() + val newRemainingKeys = remainingKeys - CoordinatorStateKey + if (newRemainingKeys.isEmpty) + becomeWaitingForStateInitialized() + else + context.become(waitingForState(newRemainingKeys)) case GetFailure(CoordinatorStateKey, _) ⇒ log.error( - "The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout' (was retrying): {} millis", + "The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout': {} millis (retrying)", readMajority.timeout.toMillis) // repeat until GetSuccess - getState() + getCoordinatorState() case NotFound(CoordinatorStateKey, _) ⇒ - // empty state, activate immediately - activate() + val newRemainingKeys = remainingKeys - CoordinatorStateKey + if (newRemainingKeys.isEmpty) + becomeWaitingForStateInitialized() + else + context.become(waitingForState(newRemainingKeys)) + + case g @ GetSuccess(AllShardsKey, _) ⇒ + shards = g.get(AllShardsKey).elements + val newUnallocatedShards = state.unallocatedShards union (shards diff state.shards.keySet) + state = state.copy(unallocatedShards = newUnallocatedShards) + val newRemainingKeys = remainingKeys - AllShardsKey + if (newRemainingKeys.isEmpty) + becomeWaitingForStateInitialized() + else + context.become(waitingForState(newRemainingKeys)) + + case GetFailure(AllShardsKey, _) ⇒ + log.error( + "The ShardCoordinator was unable to get all shards state within 'waiting-for-state-timeout': {} millis (retrying)", + readMajority.timeout.toMillis) + // repeat until GetSuccess + getAllShards() + + case NotFound(AllShardsKey, _) ⇒ + val newRemainingKeys = remainingKeys - AllShardsKey + if (newRemainingKeys.isEmpty) + becomeWaitingForStateInitialized() + else + context.become(waitingForState(newRemainingKeys)) }: Receive).orElse[Any, Unit](receiveTerminated) + private def becomeWaitingForStateInitialized(): Unit = { + if (state.isEmpty) { + // empty state, activate immediately + activate() + } else { + context.become(waitingForStateInitialized) + // note that watchStateActors may call update + watchStateActors() + } + } + // this state will stash all messages until it receives StateInitialized, // which was scheduled by previous watchStateActors def waitingForStateInitialized: Receive = { @@ -911,50 +970,99 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings, } // this state will stash all messages until it receives UpdateSuccess - def waitingForUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E ⇒ Unit): Receive = { + def waitingForUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E ⇒ Unit, + remainingKeys: Set[Key[ReplicatedData]]): Receive = { case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) ⇒ log.debug("The coordinator state was successfully updated with {}", evt) - context.unbecome() - afterUpdateCallback(evt) - unstashAll() + val newRemainingKeys = remainingKeys - CoordinatorStateKey + if (newRemainingKeys.isEmpty) + unbecomeAfterUpdate(evt, afterUpdateCallback) + else + context.become(waitingForUpdate(evt, afterUpdateCallback, newRemainingKeys)) case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) ⇒ log.error( - "The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout'={} millis (was retrying), event={}", - writeMajority.timeout.toMillis, - evt) + "The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': {} millis (retrying), event={}", + writeMajority.timeout.toMillis, evt) // repeat until UpdateSuccess - sendUpdate(evt) + sendCoordinatorStateUpdate(evt) - case ModifyFailure(CoordinatorStateKey, error, cause, Some(`evt`)) ⇒ + case UpdateSuccess(AllShardsKey, Some(newShard: String)) ⇒ + log.debug("The coordinator shards state was successfully updated with {}", newShard) + val newRemainingKeys = remainingKeys - AllShardsKey + if (newRemainingKeys.isEmpty) + unbecomeAfterUpdate(evt, afterUpdateCallback) + else + context.become(waitingForUpdate(evt, afterUpdateCallback, newRemainingKeys)) + + case UpdateTimeout(AllShardsKey, Some(newShard: String)) ⇒ + log.error( + "The ShardCoordinator was unable to update shards distributed state within 'updating-state-timeout': {} millis (retrying), event={}", + writeMajority.timeout.toMillis, evt) + // repeat until UpdateSuccess + sendAllShardsUpdate(newShard) + + case ModifyFailure(key, error, cause, _) ⇒ log.error( cause, - "The ShardCoordinator was unable to update a distributed state with error {} and event {}.Coordinator will be restarted", - error, - evt) + "The ShardCoordinator was unable to update a distributed state {} with error {} and event {}.Coordinator will be restarted", + key, error, evt) throw cause case _ ⇒ stash() } + private def unbecomeAfterUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E ⇒ Unit): Unit = { + context.unbecome() + afterUpdateCallback(evt) + unstashAll() + } + def activate() = { context.become(active) log.info("Sharding Coordinator was moved to the active state {}", state) } + override def active: Receive = + if (rememberEntities) { + ({ + case chg @ Changed(AllShardsKey) ⇒ + shards = chg.get(AllShardsKey).elements + }: Receive).orElse[Any, Unit](super.active) + } else + super.active + def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit = { - context.become(waitingForUpdate(evt, f), discardOld = false) - sendUpdate(evt) + sendCoordinatorStateUpdate(evt) + evt match { + case s: ShardHomeAllocated if rememberEntities && !shards(s.shard) ⇒ + sendAllShardsUpdate(s.shard) + context.become(waitingForUpdate(evt, f, allKeys), discardOld = false) + case _ ⇒ + // no update of shards, already known + context.become(waitingForUpdate(evt, f, Set(CoordinatorStateKey)), discardOld = false) + } + } - def getState(): Unit = + def getCoordinatorState(): Unit = { replicator ! Get(CoordinatorStateKey, readMajority) + } - def sendUpdate(evt: DomainEvent) = { + def getAllShards(): Unit = { + if (rememberEntities) + replicator ! Get(AllShardsKey, readMajority) + } + + def sendCoordinatorStateUpdate(evt: DomainEvent) = { val s = state.updated(evt) replicator ! Update(CoordinatorStateKey, LWWRegister(initEmptyState), writeMajority, Some(evt)) { reg ⇒ reg.withValue(s) } } + def sendAllShardsUpdate(newShard: String) = { + replicator ! Update(AllShardsKey, GSet.empty[String], writeMajority, Some(newShard))(_ + newShard) + } + } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index ad4a9f6db3..0516958a9f 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -72,6 +72,13 @@ abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String) exten map-size = 10 MiB } """)) + + nodeConfig(first, second)(ConfigFactory.parseString(s""" + akka.cluster.sharding.distributed-data.durable.lmdb { + # use same directory for first and second node (not used at same time) + dir = target/ShardingRememberEntitiesSpec/sharding-first-second + } + """)) } object PersistentClusterShardingRememberEntitiesSpecConfig extends ClusterShardingRememberEntitiesSpecConfig( @@ -133,7 +140,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData - s"Cluster with min-nr-of-members using sharding ($mode)" must { + s"Cluster sharding with remember entities ($mode)" must { if (!isDdataMode) { "setup shared journal" in { @@ -144,7 +151,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb } enterBarrier("peristence-started") - runOn(second, third) { + runOn(first, second, third) { system.actorSelection(node(first) / "user" / "store") ! Identify(None) val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get SharedLeveldbJournal.setStore(sharedStore, system) @@ -195,6 +202,20 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb enterBarrier("after-2") } + "start remembered entities in new cluster" in within(30.seconds) { + runOn(first) { + testConductor.exit(third, 0).await + } + enterBarrier("crash-third") + + // no nodes left of the original cluster, start a new cluster + join(first, first) + runOn(first) { + startSharding() + expectMsgType[Started] + } + enterBarrier("after-3") + } } } diff --git a/project/MiMa.scala b/project/MiMa.scala index e210a74806..4780056ad1 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1192,6 +1192,14 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.async") ), "2.5.1" -> Seq( + + // #22868 store shards + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.sendUpdate"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForUpdate"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.getState"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForState"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.this"), + // #21213 Feature request: Let BackoffSupervisor reply to messages when its child is stopped ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffSupervisor.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.copy"),