ShardRestart shouldn't go to userspace #21145
Due to order in pattern match `ShardRestart` message can be received in user-defined `messageExtractorId` (when set via providing `MessageExtractor` which is not returning null in case of unknown messages). To avoid this it's sufficient to only change the order in the pattern match. Also added note to documentation for using `MessageExtractor` to clarify that it should return null in case of unhandled message.
This commit is contained in:
parent
22d669f7f0
commit
5caa23db97
3 changed files with 7 additions and 7 deletions
|
|
@ -258,7 +258,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
|
|||
* @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion`
|
||||
* @param settings configuration settings, see [[ClusterShardingSettings]]
|
||||
* @param messageExtractor functions to extract the entity id, shard id, and the message to send to the
|
||||
* entity from the incoming message
|
||||
* entity from the incoming message, see [[ShardRegion.MessageExtractor]]
|
||||
* @param allocationStrategy possibility to use a custom shard allocation and
|
||||
* rebalancing logic
|
||||
* @param handOffStopMessage the message that will be sent to entities when they are to be stopped
|
||||
|
|
|
|||
|
|
@ -367,8 +367,8 @@ object ShardCoordinator {
|
|||
}
|
||||
|
||||
def stoppingShard: Receive = {
|
||||
case ShardStopped(shard) ⇒ done(ok = true)
|
||||
case ReceiveTimeout ⇒ done(ok = false)
|
||||
case ShardStopped(`shard`) ⇒ done(ok = true)
|
||||
case ReceiveTimeout ⇒ done(ok = false)
|
||||
}
|
||||
|
||||
def done(ok: Boolean): Unit = {
|
||||
|
|
|
|||
|
|
@ -413,8 +413,8 @@ class ShardRegion(
|
|||
case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg)
|
||||
case cmd: ShardRegionCommand ⇒ receiveCommand(cmd)
|
||||
case query: ShardRegionQuery ⇒ receiveQuery(query)
|
||||
case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender())
|
||||
case msg: RestartShard ⇒ deliverMessage(msg, sender())
|
||||
case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender())
|
||||
}
|
||||
|
||||
def receiveClusterState(state: CurrentClusterState): Unit = {
|
||||
|
|
@ -454,7 +454,7 @@ class ShardRegion(
|
|||
regionByShard.get(shard) match {
|
||||
case Some(r) if r == self && ref != self ⇒
|
||||
// should not happen, inconsistency between ShardRegion and ShardCoordinator
|
||||
throw new IllegalStateException(s"Unexpected change of shard [${shard}] from self to [${ref}]")
|
||||
throw new IllegalStateException(s"Unexpected change of shard [$shard] from self to [$ref]")
|
||||
case _ ⇒
|
||||
}
|
||||
regionByShard = regionByShard.updated(shard, ref)
|
||||
|
|
@ -546,7 +546,7 @@ class ShardRegion(
|
|||
}
|
||||
|
||||
def receiveTerminated(ref: ActorRef): Unit = {
|
||||
if (coordinator.exists(_ == ref))
|
||||
if (coordinator.contains(ref))
|
||||
coordinator = None
|
||||
else if (regions.contains(ref)) {
|
||||
val shards = regions(ref)
|
||||
|
|
@ -711,7 +711,7 @@ class ShardRegion(
|
|||
case Some(ref) ⇒
|
||||
log.debug("Forwarding request for shard [{}] to [{}]", shardId, ref)
|
||||
ref.tell(msg, snd)
|
||||
case None if (shardId == null || shardId == "") ⇒
|
||||
case None if shardId == null || shardId == "" ⇒
|
||||
log.warning("Shard must not be empty, dropping message [{}]", msg.getClass.getName)
|
||||
context.system.deadLetters ! msg
|
||||
case None ⇒
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue