From 5caa23db976bed5654a95bd9daf560e381759ea4 Mon Sep 17 00:00:00 2001 From: Kirill Plyashkevich Date: Wed, 10 Aug 2016 12:51:50 +0300 Subject: [PATCH] ShardRestart shouldn't go to userspace #21145 Due to order in pattern match `ShardRestart` message can be received in user-defined `messageExtractorId` (when set via providing `MessageExtractor` which is not returning null in case of unknown messages). To avoid this it's sufficient to only change the order in the pattern match. Also added note to documentation for using `MessageExtractor` to clarify that it should return null in case of unhandled message. --- .../scala/akka/cluster/sharding/ClusterSharding.scala | 2 +- .../scala/akka/cluster/sharding/ShardCoordinator.scala | 4 ++-- .../main/scala/akka/cluster/sharding/ShardRegion.scala | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 631d064939..f063268ccc 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -258,7 +258,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * @param entityProps the `Props` of the entity actors that will be created by the `ShardRegion` * @param settings configuration settings, see [[ClusterShardingSettings]] * @param messageExtractor functions to extract the entity id, shard id, and the message to send to the - * entity from the incoming message + * entity from the incoming message, see [[ShardRegion.MessageExtractor]] * @param allocationStrategy possibility to use a custom shard allocation and * rebalancing logic * @param handOffStopMessage the message that will be sent to entities when they are to be stopped diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 370a635f1f..e980f7ab9c 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -367,8 +367,8 @@ object ShardCoordinator { } def stoppingShard: Receive = { - case ShardStopped(shard) ⇒ done(ok = true) - case ReceiveTimeout ⇒ done(ok = false) + case ShardStopped(`shard`) ⇒ done(ok = true) + case ReceiveTimeout ⇒ done(ok = false) } def done(ok: Boolean): Unit = { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index c73eeb8f64..5593191c06 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -413,8 +413,8 @@ class ShardRegion( case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg) case cmd: ShardRegionCommand ⇒ receiveCommand(cmd) case query: ShardRegionQuery ⇒ receiveQuery(query) - case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) case msg: RestartShard ⇒ deliverMessage(msg, sender()) + case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) } def receiveClusterState(state: CurrentClusterState): Unit = { @@ -454,7 +454,7 @@ class ShardRegion( regionByShard.get(shard) match { case Some(r) if r == self && ref != self ⇒ // should not happen, inconsistency between ShardRegion and ShardCoordinator - throw new IllegalStateException(s"Unexpected change of shard [${shard}] from self to [${ref}]") + throw new IllegalStateException(s"Unexpected change of shard [$shard] from self to [$ref]") case _ ⇒ } regionByShard = regionByShard.updated(shard, ref) @@ -546,7 +546,7 @@ class ShardRegion( } def receiveTerminated(ref: ActorRef): Unit = { - if (coordinator.exists(_ == ref)) + if (coordinator.contains(ref)) coordinator = None else if (regions.contains(ref)) { val shards = regions(ref) @@ -711,7 +711,7 @@ class ShardRegion( case Some(ref) ⇒ log.debug("Forwarding request for shard [{}] to [{}]", shardId, ref) ref.tell(msg, snd) - case None if (shardId == null || shardId == "") ⇒ + case None if shardId == null || shardId == "" ⇒ log.warning("Shard must not be empty, dropping message [{}]", msg.getClass.getName) context.system.deadLetters ! msg case None ⇒