Merge pull request #22877 from akka/wip-22868-store-shards-patriknw
Start shards after full cluster restart, #22868
This commit is contained in:
commit
627fedd514
3 changed files with 167 additions and 30 deletions
|
|
@ -22,6 +22,10 @@ import akka.dispatch.ExecutionContexts
|
|||
import akka.pattern.{ AskTimeoutException, pipe }
|
||||
import akka.persistence._
|
||||
import akka.cluster.ClusterEvent
|
||||
import akka.cluster.ddata.GSet
|
||||
import akka.cluster.ddata.GSetKey
|
||||
import akka.cluster.ddata.Key
|
||||
import akka.cluster.ddata.ReplicatedData
|
||||
|
||||
/**
|
||||
* @see [[ClusterSharding$ ClusterSharding extension]]
|
||||
|
|
@ -46,7 +50,7 @@ object ShardCoordinator {
|
|||
allocationStrategy: ShardAllocationStrategy,
|
||||
replicator: ActorRef, majorityMinCap: Int): Props =
|
||||
Props(new DDataShardCoordinator(typeName: String, settings, allocationStrategy, replicator,
|
||||
majorityMinCap)).withDeploy(Deploy.local)
|
||||
majorityMinCap, settings.rememberEntities)).withDeploy(Deploy.local)
|
||||
|
||||
/**
|
||||
* Interface of the pluggable shard allocation and rebalancing logic used by the [[ShardCoordinator]].
|
||||
|
|
@ -274,6 +278,11 @@ object ShardCoordinator {
|
|||
copy(unallocatedShards = Set.empty, rememberEntities = enabled)
|
||||
}
|
||||
|
||||
def isEmpty: Boolean =
|
||||
shards.isEmpty && regions.isEmpty && regionProxies.isEmpty
|
||||
|
||||
def allShards: Set[ShardId] = shards.keySet union unallocatedShards
|
||||
|
||||
def updated(event: DomainEvent): State = event match {
|
||||
case ShardRegionRegistered(region) ⇒
|
||||
require(!regions.contains(region), s"Region $region already registered: $this")
|
||||
|
|
@ -857,7 +866,8 @@ class PersistentShardCoordinator(typeName: String, settings: ClusterShardingSett
|
|||
class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
||||
allocationStrategy: ShardCoordinator.ShardAllocationStrategy,
|
||||
replicator: ActorRef,
|
||||
majorityMinCap: Int)
|
||||
majorityMinCap: Int,
|
||||
rememberEntities: Boolean)
|
||||
extends ShardCoordinator(typeName, settings, allocationStrategy) with Stash {
|
||||
import ShardCoordinator.Internal._
|
||||
import akka.cluster.ddata.Replicator.Update
|
||||
|
|
@ -871,34 +881,83 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
val CoordinatorStateKey = LWWRegisterKey[State](s"${typeName}CoordinatorState")
|
||||
val initEmptyState = State.empty.withRememberEntities(settings.rememberEntities)
|
||||
|
||||
val AllShardsKey = GSetKey[String](s"shard-${typeName}-all")
|
||||
val allKeys: Set[Key[ReplicatedData]] =
|
||||
if (rememberEntities) Set(CoordinatorStateKey, AllShardsKey) else Set(CoordinatorStateKey)
|
||||
|
||||
var shards = Set.empty[String]
|
||||
if (rememberEntities)
|
||||
replicator ! Subscribe(AllShardsKey, self)
|
||||
|
||||
node.subscribe(self, ClusterEvent.InitialStateAsEvents, ClusterShuttingDown.getClass)
|
||||
|
||||
// get state from ddata replicator, repeat until GetSuccess
|
||||
getState()
|
||||
getCoordinatorState()
|
||||
getAllShards()
|
||||
|
||||
override def receive: Receive = waitingForState
|
||||
override def receive: Receive = waitingForState(allKeys)
|
||||
|
||||
// This state will drop all other messages since they will be retried
|
||||
def waitingForState: Receive = ({
|
||||
def waitingForState(remainingKeys: Set[Key[ReplicatedData]]): Receive = ({
|
||||
case g @ GetSuccess(CoordinatorStateKey, _) ⇒
|
||||
state = g.get(CoordinatorStateKey).value.withRememberEntities(settings.rememberEntities)
|
||||
context.become(waitingForStateInitialized)
|
||||
// note that watchStateActors may call update
|
||||
watchStateActors()
|
||||
val newRemainingKeys = remainingKeys - CoordinatorStateKey
|
||||
if (newRemainingKeys.isEmpty)
|
||||
becomeWaitingForStateInitialized()
|
||||
else
|
||||
context.become(waitingForState(newRemainingKeys))
|
||||
|
||||
case GetFailure(CoordinatorStateKey, _) ⇒
|
||||
log.error(
|
||||
"The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout' (was retrying): {} millis",
|
||||
"The ShardCoordinator was unable to get an initial state within 'waiting-for-state-timeout': {} millis (retrying)",
|
||||
readMajority.timeout.toMillis)
|
||||
// repeat until GetSuccess
|
||||
getState()
|
||||
getCoordinatorState()
|
||||
|
||||
case NotFound(CoordinatorStateKey, _) ⇒
|
||||
// empty state, activate immediately
|
||||
activate()
|
||||
val newRemainingKeys = remainingKeys - CoordinatorStateKey
|
||||
if (newRemainingKeys.isEmpty)
|
||||
becomeWaitingForStateInitialized()
|
||||
else
|
||||
context.become(waitingForState(newRemainingKeys))
|
||||
|
||||
case g @ GetSuccess(AllShardsKey, _) ⇒
|
||||
shards = g.get(AllShardsKey).elements
|
||||
val newUnallocatedShards = state.unallocatedShards union (shards diff state.shards.keySet)
|
||||
state = state.copy(unallocatedShards = newUnallocatedShards)
|
||||
val newRemainingKeys = remainingKeys - AllShardsKey
|
||||
if (newRemainingKeys.isEmpty)
|
||||
becomeWaitingForStateInitialized()
|
||||
else
|
||||
context.become(waitingForState(newRemainingKeys))
|
||||
|
||||
case GetFailure(AllShardsKey, _) ⇒
|
||||
log.error(
|
||||
"The ShardCoordinator was unable to get all shards state within 'waiting-for-state-timeout': {} millis (retrying)",
|
||||
readMajority.timeout.toMillis)
|
||||
// repeat until GetSuccess
|
||||
getAllShards()
|
||||
|
||||
case NotFound(AllShardsKey, _) ⇒
|
||||
val newRemainingKeys = remainingKeys - AllShardsKey
|
||||
if (newRemainingKeys.isEmpty)
|
||||
becomeWaitingForStateInitialized()
|
||||
else
|
||||
context.become(waitingForState(newRemainingKeys))
|
||||
|
||||
}: Receive).orElse[Any, Unit](receiveTerminated)
|
||||
|
||||
private def becomeWaitingForStateInitialized(): Unit = {
|
||||
if (state.isEmpty) {
|
||||
// empty state, activate immediately
|
||||
activate()
|
||||
} else {
|
||||
context.become(waitingForStateInitialized)
|
||||
// note that watchStateActors may call update
|
||||
watchStateActors()
|
||||
}
|
||||
}
|
||||
|
||||
// this state will stash all messages until it receives StateInitialized,
|
||||
// which was scheduled by previous watchStateActors
|
||||
def waitingForStateInitialized: Receive = {
|
||||
|
|
@ -911,50 +970,99 @@ class DDataShardCoordinator(typeName: String, settings: ClusterShardingSettings,
|
|||
}
|
||||
|
||||
// this state will stash all messages until it receives UpdateSuccess
|
||||
def waitingForUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E ⇒ Unit): Receive = {
|
||||
def waitingForUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E ⇒ Unit,
|
||||
remainingKeys: Set[Key[ReplicatedData]]): Receive = {
|
||||
case UpdateSuccess(CoordinatorStateKey, Some(`evt`)) ⇒
|
||||
log.debug("The coordinator state was successfully updated with {}", evt)
|
||||
context.unbecome()
|
||||
afterUpdateCallback(evt)
|
||||
unstashAll()
|
||||
val newRemainingKeys = remainingKeys - CoordinatorStateKey
|
||||
if (newRemainingKeys.isEmpty)
|
||||
unbecomeAfterUpdate(evt, afterUpdateCallback)
|
||||
else
|
||||
context.become(waitingForUpdate(evt, afterUpdateCallback, newRemainingKeys))
|
||||
|
||||
case UpdateTimeout(CoordinatorStateKey, Some(`evt`)) ⇒
|
||||
log.error(
|
||||
"The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout'={} millis (was retrying), event={}",
|
||||
writeMajority.timeout.toMillis,
|
||||
evt)
|
||||
"The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': {} millis (retrying), event={}",
|
||||
writeMajority.timeout.toMillis, evt)
|
||||
// repeat until UpdateSuccess
|
||||
sendUpdate(evt)
|
||||
sendCoordinatorStateUpdate(evt)
|
||||
|
||||
case ModifyFailure(CoordinatorStateKey, error, cause, Some(`evt`)) ⇒
|
||||
case UpdateSuccess(AllShardsKey, Some(newShard: String)) ⇒
|
||||
log.debug("The coordinator shards state was successfully updated with {}", newShard)
|
||||
val newRemainingKeys = remainingKeys - AllShardsKey
|
||||
if (newRemainingKeys.isEmpty)
|
||||
unbecomeAfterUpdate(evt, afterUpdateCallback)
|
||||
else
|
||||
context.become(waitingForUpdate(evt, afterUpdateCallback, newRemainingKeys))
|
||||
|
||||
case UpdateTimeout(AllShardsKey, Some(newShard: String)) ⇒
|
||||
log.error(
|
||||
"The ShardCoordinator was unable to update shards distributed state within 'updating-state-timeout': {} millis (retrying), event={}",
|
||||
writeMajority.timeout.toMillis, evt)
|
||||
// repeat until UpdateSuccess
|
||||
sendAllShardsUpdate(newShard)
|
||||
|
||||
case ModifyFailure(key, error, cause, _) ⇒
|
||||
log.error(
|
||||
cause,
|
||||
"The ShardCoordinator was unable to update a distributed state with error {} and event {}.Coordinator will be restarted",
|
||||
error,
|
||||
evt)
|
||||
"The ShardCoordinator was unable to update a distributed state {} with error {} and event {}.Coordinator will be restarted",
|
||||
key, error, evt)
|
||||
throw cause
|
||||
|
||||
case _ ⇒ stash()
|
||||
}
|
||||
|
||||
private def unbecomeAfterUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E ⇒ Unit): Unit = {
|
||||
context.unbecome()
|
||||
afterUpdateCallback(evt)
|
||||
unstashAll()
|
||||
}
|
||||
|
||||
def activate() = {
|
||||
context.become(active)
|
||||
log.info("Sharding Coordinator was moved to the active state {}", state)
|
||||
}
|
||||
|
||||
override def active: Receive =
|
||||
if (rememberEntities) {
|
||||
({
|
||||
case chg @ Changed(AllShardsKey) ⇒
|
||||
shards = chg.get(AllShardsKey).elements
|
||||
}: Receive).orElse[Any, Unit](super.active)
|
||||
} else
|
||||
super.active
|
||||
|
||||
def update[E <: DomainEvent](evt: E)(f: E ⇒ Unit): Unit = {
|
||||
context.become(waitingForUpdate(evt, f), discardOld = false)
|
||||
sendUpdate(evt)
|
||||
sendCoordinatorStateUpdate(evt)
|
||||
evt match {
|
||||
case s: ShardHomeAllocated if rememberEntities && !shards(s.shard) ⇒
|
||||
sendAllShardsUpdate(s.shard)
|
||||
context.become(waitingForUpdate(evt, f, allKeys), discardOld = false)
|
||||
case _ ⇒
|
||||
// no update of shards, already known
|
||||
context.become(waitingForUpdate(evt, f, Set(CoordinatorStateKey)), discardOld = false)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def getState(): Unit =
|
||||
def getCoordinatorState(): Unit = {
|
||||
replicator ! Get(CoordinatorStateKey, readMajority)
|
||||
}
|
||||
|
||||
def sendUpdate(evt: DomainEvent) = {
|
||||
def getAllShards(): Unit = {
|
||||
if (rememberEntities)
|
||||
replicator ! Get(AllShardsKey, readMajority)
|
||||
}
|
||||
|
||||
def sendCoordinatorStateUpdate(evt: DomainEvent) = {
|
||||
val s = state.updated(evt)
|
||||
replicator ! Update(CoordinatorStateKey, LWWRegister(initEmptyState), writeMajority, Some(evt)) { reg ⇒
|
||||
reg.withValue(s)
|
||||
}
|
||||
}
|
||||
|
||||
def sendAllShardsUpdate(newShard: String) = {
|
||||
replicator ! Update(AllShardsKey, GSet.empty[String], writeMajority, Some(newShard))(_ + newShard)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -72,6 +72,13 @@ abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String) exten
|
|||
map-size = 10 MiB
|
||||
}
|
||||
"""))
|
||||
|
||||
nodeConfig(first, second)(ConfigFactory.parseString(s"""
|
||||
akka.cluster.sharding.distributed-data.durable.lmdb {
|
||||
# use same directory for first and second node (not used at same time)
|
||||
dir = target/ShardingRememberEntitiesSpec/sharding-first-second
|
||||
}
|
||||
"""))
|
||||
}
|
||||
|
||||
object PersistentClusterShardingRememberEntitiesSpecConfig extends ClusterShardingRememberEntitiesSpecConfig(
|
||||
|
|
@ -133,7 +140,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb
|
|||
|
||||
def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
|
||||
|
||||
s"Cluster with min-nr-of-members using sharding ($mode)" must {
|
||||
s"Cluster sharding with remember entities ($mode)" must {
|
||||
|
||||
if (!isDdataMode) {
|
||||
"setup shared journal" in {
|
||||
|
|
@ -144,7 +151,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb
|
|||
}
|
||||
enterBarrier("peristence-started")
|
||||
|
||||
runOn(second, third) {
|
||||
runOn(first, second, third) {
|
||||
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
|
||||
val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get
|
||||
SharedLeveldbJournal.setStore(sharedStore, system)
|
||||
|
|
@ -195,6 +202,20 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb
|
|||
enterBarrier("after-2")
|
||||
}
|
||||
|
||||
"start remembered entities in new cluster" in within(30.seconds) {
|
||||
runOn(first) {
|
||||
testConductor.exit(third, 0).await
|
||||
}
|
||||
enterBarrier("crash-third")
|
||||
|
||||
// no nodes left of the original cluster, start a new cluster
|
||||
join(first, first)
|
||||
runOn(first) {
|
||||
startSharding()
|
||||
expectMsgType[Started]
|
||||
}
|
||||
enterBarrier("after-3")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1192,6 +1192,14 @@ object MiMa extends AutoPlugin {
|
|||
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("akka.stream.Graph.async")
|
||||
),
|
||||
"2.5.1" -> Seq(
|
||||
|
||||
// #22868 store shards
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.sendUpdate"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForUpdate"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.getState"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.waitingForState"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShardCoordinator.this"),
|
||||
|
||||
// #21213 Feature request: Let BackoffSupervisor reply to messages when its child is stopped
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffSupervisor.this"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.BackoffOptionsImpl.copy"),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue