=con #15440 Add invariant checks to ClusterSharding state

* I suspect that the issue #15440 happens because of replay of events
  in wrong order (ShardHomeAllocated received before ShardRegionRegistered)
  by the hbase journal
* This does not fix that issue, but the additional invariant checks and
  debug statements  would perhaps make it easier for us to diagnose such
  issues
* These changes also ensures that the allocation strategy does not return
  the wrong thing.
* It also tightens a possible error if a region is terminated while a
  rebalance is in progress

(cherry picked from commit d07b9db4958236d580b8bfb8f92461969ff88cbc)
This commit is contained in:
Patrik Nordwall 2014-06-30 09:00:41 +02:00
parent 68a5675220
commit a188099f91

View file

@ -1098,21 +1098,29 @@ object ShardCoordinator {
def updated(event: DomainEvent): State = event match {
case ShardRegionRegistered(region)
require(!regions.contains(region), s"Region $region already registered: $this")
copy(regions = regions.updated(region, Vector.empty))
case ShardRegionProxyRegistered(proxy)
require(!regionProxies.contains(proxy), s"Region proxy $proxy already registered: $this")
copy(regionProxies = regionProxies + proxy)
case ShardRegionTerminated(region)
require(regions.contains(region), s"Terminated region $region not registered: $this")
copy(
regions = regions - region,
shards = shards -- regions(region))
case ShardRegionProxyTerminated(proxy)
require(regionProxies.contains(proxy), s"Terminated region proxy $proxy not registered: $this")
copy(regionProxies = regionProxies - proxy)
case ShardHomeAllocated(shard, region)
require(regions.contains(region), s"Region $region not registered: $this")
require(!shards.contains(shard), s"Shard [$shard] already allocated: $this")
copy(
shards = shards.updated(shard, region),
regions = regions.updated(region, regions(region) :+ shard))
case ShardHomeDeallocated(shard)
require(shards.contains(shard), s"Shard [$shard] not allocated: $this")
val region = shards(shard)
require(regions.contains(region), s"Region $region for shard [$shard] not registered: $this")
copy(
shards = shards - shard,
regions = regions.updated(region, regions(region).filterNot(_ == shard)))
@ -1206,28 +1214,32 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
}
override def receiveRecover: Receive = {
case evt: DomainEvent evt match {
case ShardRegionRegistered(region)
persistentState = persistentState.updated(evt)
case ShardRegionProxyRegistered(proxy)
persistentState = persistentState.updated(evt)
case ShardRegionTerminated(region)
if (persistentState.regions.contains(region))
case evt: DomainEvent
log.debug("receiveRecover {}", evt)
evt match {
case ShardRegionRegistered(region)
persistentState = persistentState.updated(evt)
else {
log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " +
" some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " +
"removed by later watch.", region)
}
case ShardRegionProxyTerminated(proxy)
persistentState = persistentState.updated(evt)
case _: ShardHomeAllocated
persistentState = persistentState.updated(evt)
case _: ShardHomeDeallocated
persistentState = persistentState.updated(evt)
}
case ShardRegionProxyRegistered(proxy)
persistentState = persistentState.updated(evt)
case ShardRegionTerminated(region)
if (persistentState.regions.contains(region))
persistentState = persistentState.updated(evt)
else {
log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " +
" some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " +
"removed by later watch.", region)
}
case ShardRegionProxyTerminated(proxy)
if (persistentState.regionProxies.contains(proxy))
persistentState = persistentState.updated(evt)
case ShardHomeAllocated(shard, region)
persistentState = persistentState.updated(evt)
case _: ShardHomeDeallocated
persistentState = persistentState.updated(evt)
}
case SnapshotOffer(_, state: State)
log.debug("receiveRecover SnapshotOffer {}", state)
persistentState = state
}
@ -1274,6 +1286,8 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
case None
if (persistentState.regions.nonEmpty) {
val region = allocationStrategy.allocateShard(sender(), shard, persistentState.regions)
require(persistentState.regions.contains(region),
s"Allocated region $region for shard [$shard] must be one of the registered regions: $persistentState")
persist(ShardHomeAllocated(shard, region)) { evt
persistentState = persistentState.updated(evt)
log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region)
@ -1296,10 +1310,12 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
case RebalanceDone(shard, ok)
rebalanceInProgress -= shard
log.debug("Rebalance shard [{}] done [{}]", shard, ok)
if (ok) persist(ShardHomeDeallocated(shard)) { evt
persistentState = persistentState.updated(evt)
log.debug("Shard [{}] deallocated", evt.shard)
}
// The shard could have been removed by ShardRegionTerminated
if (ok && persistentState.shards.contains(shard))
persist(ShardHomeDeallocated(shard)) { evt
persistentState = persistentState.updated(evt)
log.debug("Shard [{}] deallocated", evt.shard)
}
case SnapshotTick
log.debug("Saving persistent snapshot")