Issue #18945. Fix HandOff problem where Shards may not be killed.

Issue #18945. Fix HandOff problem where Shards may not be killed.
This commit is contained in:
Marcin Sosnicki 2015-11-21 15:58:26 +00:00
parent a7021827c3
commit e77bb2fa45

View file

@ -366,6 +366,7 @@ class ShardRegion(
var loggedFullBufferWarning = false
var shards = Map.empty[ShardId, ActorRef]
var shardsByRef = Map.empty[ActorRef, ShardId]
var startingShards = Set.empty[ShardId]
var handingOff = Set.empty[ActorRef]
var gracefulShutdownInProgress = false
@ -560,6 +561,7 @@ class ShardRegion(
shardsByRef = shardsByRef - ref
shards = shards - shardId
startingShards -= shardId
if (handingOff.contains(ref)) {
handingOff = handingOff - ref
log.debug("Shard [{}] handoff complete", shardId)
@ -632,7 +634,7 @@ class ShardRegion(
def initializeShard(id: ShardId, shard: ActorRef): Unit = {
log.debug("Shard was initialized {}", id)
shards = shards.updated(id, shard)
startingShards -= id
deliverBufferedMessages(id, shard)
}
@ -722,29 +724,38 @@ class ShardRegion(
}
}
def getShard(id: ShardId): Option[ActorRef] = shards.get(id).orElse(
entityProps match {
case Some(props) if !shardsByRef.values.exists(_ == id)
log.debug("Starting shard [{}] in region", id)
def getShard(id: ShardId): Option[ActorRef] = {
if (startingShards.contains(id))
None
else {
shards.get(id).orElse(
entityProps match {
case Some(props) if !shardsByRef.values.exists(_ == id)
log.debug("Starting shard [{}] in region", id)
val name = URLEncoder.encode(id, "utf-8")
val shard = context.watch(context.actorOf(
Shard.props(
typeName,
id,
props,
settings,
extractEntityId,
extractShardId,
handOffStopMessage).withDispatcher(context.props.dispatcher),
name))
shardsByRef = shardsByRef.updated(shard, id)
None
case Some(props)
None
case None
throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion")
})
val name = URLEncoder.encode(id, "utf-8")
val shard = context.watch(context.actorOf(
Shard.props(
typeName,
id,
props,
settings,
extractEntityId,
extractShardId,
handOffStopMessage).withDispatcher(context.props.dispatcher),
name))
shardsByRef = shardsByRef.updated(shard, id)
shards = shards.updated(id, shard)
startingShards += id
None
case Some(props)
None
case None
throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion")
}
)
}
}
def sendGracefulShutdownToCoordinator(): Unit =
if (gracefulShutdownInProgress)