diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala index 29a016eaff..2b5037d868 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala @@ -4,9 +4,13 @@ package akka.cluster.sharding.typed +import scala.concurrent.duration.FiniteDuration + import akka.actor.typed.ActorRef +import akka.cluster.sharding.ShardRegion.ClusterShardingStats import akka.cluster.sharding.ShardRegion.CurrentShardRegionState import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.util.JavaDurationConverters /** * Protocol for querying sharding state e.g. A ShardRegion's state @@ -16,6 +20,11 @@ sealed trait ClusterShardingQuery /** * Query the ShardRegion state for the given entity type key. This will get the state of the * local ShardRegion's state. + * + * Intended for testing purpose to see when cluster sharding is "ready" or to monitor + * the state of the shard regions. + * + * For the statistics for the entire cluster, see [[GetClusterShardingStats]]. */ final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: ActorRef[CurrentShardRegionState]) extends ClusterShardingQuery { @@ -30,4 +39,33 @@ final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: A this(entityTypeKey.asScala, replyTo) } -// TODO - GetClusterShardingStats +/** + * Query the statistics about the currently running sharded entities in the + * entire cluster. If the given `timeout` is reached without answers from all + * shard regions the reply will contain an empty map of regions. + * + * Intended for testing purpose to see when cluster sharding is "ready" or to monitor + * the state of the shard regions. + * + * @param timeout the timeout applied to querying all alive regions + * @param replyTo the actor to send the result to + */ +final case class GetClusterShardingStats( + entityTypeKey: EntityTypeKey[_], + timeout: FiniteDuration, + replyTo: ActorRef[ClusterShardingStats]) + extends ClusterShardingQuery { + + /** + * Java API + * + * Query the statistics about the currently running sharded entities in the + * entire cluster. If the given `timeout` is reached without answers from all + * shard regions the reply will contain an empty map of regions. + */ + def this( + entityTypeKey: javadsl.EntityTypeKey[_], + timeout: java.time.Duration, + replyTo: ActorRef[ClusterShardingStats]) = + this(entityTypeKey.asScala, JavaDurationConverters.asFiniteDuration(timeout), replyTo) +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala index 1f08f0c1c1..ec74d2b7c9 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala @@ -3,27 +3,64 @@ */ package akka.cluster.sharding.typed.internal + import akka.actor.typed.Behavior +import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.annotation.InternalApi -import akka.cluster.sharding.{ ClusterSharding, ShardRegion } +import akka.cluster.sharding.ClusterSharding +import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion.CurrentShardRegionState -import akka.cluster.sharding.typed.{ ClusterShardingQuery, GetShardRegionState } +import akka.cluster.sharding.typed.ClusterShardingQuery +import akka.cluster.sharding.typed.GetClusterShardingStats +import akka.cluster.sharding.typed.GetShardRegionState /** * INTERNAL API */ @InternalApi private[akka] object ShardingState { - def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = Behaviors.receiveMessage { - case GetShardRegionState(key, replyTo) => - if (classicSharding.getShardTypeNames.contains(key.name)) { - classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic) - } else { - replyTo ! CurrentShardRegionState(Set.empty) + def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = { + Behaviors + .supervise[ClusterShardingQuery] { + Behaviors.setup { context => + Behaviors.receiveMessage { + case GetShardRegionState(key, replyTo) => + if (classicSharding.getShardTypeNames.contains(key.name)) { + try { + classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic) + } catch { + case e: IllegalStateException => + // classicSharding.shardRegion may throw if not initialized + context.log.warn(e.getMessage) + replyTo ! CurrentShardRegionState(Set.empty) + } + } else { + replyTo ! CurrentShardRegionState(Set.empty) + } + Behaviors.same + + case GetClusterShardingStats(key, timeout, replyTo) => + if (classicSharding.getShardTypeNames.contains(key.name)) { + try { + classicSharding + .shardRegion(key.name) + .tell(ShardRegion.GetClusterShardingStats(timeout), replyTo.toClassic) + } catch { + case e: IllegalStateException => + // classicSharding.shardRegion may throw if not initialized + context.log.warn(e.getMessage) + replyTo ! ShardRegion.ClusterShardingStats(Map.empty) + } + } else { + replyTo ! ShardRegion.ClusterShardingStats(Map.empty) + } + Behaviors.same + } + } } - Behaviors.same + .onFailure(SupervisorStrategy.restart) } } diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala new file mode 100644 index 0000000000..d441b29a08 --- /dev/null +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed + +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.MultiNodeClusterSpec +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.Entity +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.typed.MultiNodeTypedClusterSpec +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.serialization.jackson.CborSerializable + +object ClusterShardingStatsSpecConfig extends MultiNodeConfig { + + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(""" + akka.log-dead-letters-during-shutdown = off + akka.cluster.sharding.updating-state-timeout = 2s + akka.cluster.sharding.waiting-for-state-timeout = 2s + """).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class ClusterShardingStatsSpecMultiJvmNode1 extends ClusterShardingStatsSpec +class ClusterShardingStatsSpecMultiJvmNode2 extends ClusterShardingStatsSpec +class ClusterShardingStatsSpecMultiJvmNode3 extends ClusterShardingStatsSpec + +object Pinger { + sealed trait Command extends CborSerializable + case class Ping(id: Int, ref: ActorRef[Pong]) extends Command + case class Pong(id: Int) extends CborSerializable + + def apply(): Behavior[Command] = { + Behaviors.receiveMessage[Command] { + case Ping(id: Int, ref) => + ref ! Pong(id) + Behaviors.same + } + } + +} + +abstract class ClusterShardingStatsSpec + extends MultiNodeSpec(ClusterShardingStatsSpecConfig) + with MultiNodeTypedClusterSpec + with ScalaFutures { + + import ClusterShardingStatsSpecConfig._ + import Pinger._ + + private val typeKey = EntityTypeKey[Command]("ping") + + private val sharding = ClusterSharding(typedSystem) + private val settings = ClusterShardingSettings(typedSystem) + private val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all + + "Cluster sharding stats" must { + "form cluster" in { + formCluster(first, second, third) + } + + "get shard stats" in { + sharding.init(Entity(typeKey)(_ => Pinger())) + enterBarrier("sharding started") + + runOn(first) { + val pongProbe = TestProbe[Pong]() + + val entityRef1 = ClusterSharding(typedSystem).entityRefFor(typeKey, "ping-1") + entityRef1 ! Ping(1, pongProbe.ref) + pongProbe.receiveMessage() + + val entityRef2 = ClusterSharding(typedSystem).entityRefFor(typeKey, "ping-2") + entityRef2 ! Ping(2, pongProbe.ref) + pongProbe.receiveMessage() + } + enterBarrier("sharding-initialized") + + runOn(first, second, third) { + val replyToProbe = TestProbe[ClusterShardingStats]() + sharding.shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyToProbe.ref) + + val stats = replyToProbe.receiveMessage(queryTimeout) + stats.regions.size shouldEqual 3 + stats.regions.values.flatMap(_.stats.values).sum shouldEqual 2 + stats.regions.values.forall(_.failed.isEmpty) shouldBe true + } + enterBarrier("done") + + } + + } + +} diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java index bd72ed5879..450e56db6b 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java @@ -24,6 +24,17 @@ import akka.persistence.typed.PersistenceId; // #import +// #get-shard-region-state +import akka.cluster.sharding.typed.GetShardRegionState; +import akka.cluster.sharding.ShardRegion.CurrentShardRegionState; + +// #get-shard-region-state +// #get-cluster-sharding-stats +import akka.cluster.sharding.typed.GetClusterShardingStats; +import akka.cluster.sharding.ShardRegion.ClusterShardingStats; + +// #get-cluster-sharding-stats + import jdocs.akka.persistence.typed.BlogPostEntity; interface ShardingCompileOnlyTest { @@ -248,4 +259,31 @@ interface ShardingCompileOnlyTest { ClusterSharding.get(system).entityRefFor(typeKey, entityId, "dc2"); // #proxy-dc-entityref } + + public static void shardRegionQqueryExample() { + ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample"); + ActorRef replyMessageAdapter = null; + EntityTypeKey typeKey = EntityTypeKey.create(Counter.Command.class, "Counter"); + + // #get-shard-region-state + ActorRef replyTo = replyMessageAdapter; + + ClusterSharding.get(system).shardState().tell(new GetShardRegionState(typeKey, replyTo)); + // #get-shard-region-state + } + + public static void shardingStatsQqueryExample() { + ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample"); + ActorRef replyMessageAdapter = null; + EntityTypeKey typeKey = EntityTypeKey.create(Counter.Command.class, "Counter"); + + // #get-cluster-sharding-stats + ActorRef replyTo = replyMessageAdapter; + Duration timeout = Duration.ofSeconds(5); + + ClusterSharding.get(system) + .shardState() + .tell(new GetClusterShardingStats(typeKey, timeout, replyTo)); + // #get-cluster-sharding-stats + } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala index 558a3413c3..f47ccdd52c 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala @@ -19,19 +19,19 @@ class ClusterShardingStateSpec with AnyWordSpecLike with LogCapturing { - val sharding = ClusterSharding(system) + private val sharding = ClusterSharding(system) - val shardExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { + private val shardExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { case IdReplyPlz(id, _) => id case IdWhoAreYou(id, _) => id case other => throw new IllegalArgumentException(s"Unexpected message $other") } - val cluster = Cluster(system) + private val cluster = Cluster(system) val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes - "Cluster Sharding" must { + "Cluster Sharding CurrentShardRegionState query" must { "allow querying of the shard region state" in { val probe = TestProbe[CurrentShardRegionState]() cluster.manager ! Join(cluster.selfMember.address) diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala new file mode 100644 index 0000000000..628d2f7a54 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.scaladsl + +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.sharding.ShardRegion.ShardRegionStats +import akka.cluster.sharding.typed.ClusterShardingSettings +import akka.cluster.sharding.typed.GetClusterShardingStats +import akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec._ +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import akka.cluster.typed.SelfUp + +class ClusterShardingStatsSpec + extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) + with AnyWordSpecLike + with LogCapturing { + + private val sharding = ClusterSharding(system) + + private val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes + + private val shardExtractor = ClusterShardingSpec.idTestProtocolMessageExtractor + + // no need to scale this up here for the cluster query versus one region + private val queryTimeout = ClusterShardingSettings(system).shardRegionQueryTimeout + + "Cluster Sharding ClusterShardingStats query" must { + "return empty statistics if there are no running sharded entities" in { + val cluster = Cluster(system) + val upProbe = createTestProbe[SelfUp]() + + cluster.subscriptions ! akka.cluster.typed.Subscribe(upProbe.ref, classOf[SelfUp]) + cluster.manager ! Join(cluster.selfMember.address) + upProbe.expectMessageType[SelfUp] + + val replyProbe = createTestProbe[ClusterShardingStats]() + sharding.shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyProbe.ref) + replyProbe.expectMessage(ClusterShardingStats(Map.empty)) + } + + "allow querying of statistics of the currently running sharded entities in the entire cluster" in { + val shardingRef: ActorRef[IdTestProtocol] = sharding.init( + Entity(typeKey)(_ => ClusterShardingSpec.behaviorWithId()) + .withStopMessage(IdStopPlz()) + .withMessageExtractor(idTestProtocolMessageExtractor)) + + val replyProbe = createTestProbe[String]() + val id1 = "id1" + shardingRef ! IdReplyPlz(id1, replyProbe.ref) + replyProbe.expectMessage("Hello!") + + val replyToProbe = createTestProbe[ClusterShardingStats]() + val replyTo = replyToProbe.ref + + ClusterSharding(system).shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyTo) + val stats = replyToProbe.receiveMessage() + + val expect = ClusterShardingStats( + Map(Cluster(system).selfMember.address -> ShardRegionStats(Map(shardExtractor.shardId(id1) -> 1), Set.empty))) + + stats shouldEqual expect + } + + } + +} diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala index c6c8e8e52e..7af737140e 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -14,6 +14,9 @@ import com.github.ghik.silencer.silent import docs.akka.persistence.typed.BlogPostEntity import docs.akka.persistence.typed.BlogPostEntity.Command +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey + @silent object ShardingCompileOnlySpec { @@ -169,4 +172,42 @@ object ShardingCompileOnlySpec { //#sharded-response } + object ShardRegionStateQuery { + + object Counter { + val TypeKey = EntityTypeKey[Basics.Counter.Command]("Counter") + } + + val replyMessageAdapter: ActorRef[akka.cluster.sharding.ShardRegion.CurrentShardRegionState] = ??? + + //#get-shard-region-state + import akka.cluster.sharding.typed.GetShardRegionState + import akka.cluster.sharding.ShardRegion.CurrentShardRegionState + + val replyTo: ActorRef[CurrentShardRegionState] = replyMessageAdapter + + ClusterSharding(system).shardState ! GetShardRegionState(Counter.TypeKey, replyTo) + //#get-shard-region-state + } + + object ClusterShardingStatsQuery { + + object Counter { + val TypeKey = EntityTypeKey[Basics.Counter.Command]("Counter") + } + + val replyMessageAdapter: ActorRef[akka.cluster.sharding.ShardRegion.ClusterShardingStats] = ??? + + //#get-cluster-sharding-stats + import akka.cluster.sharding.typed.GetClusterShardingStats + import akka.cluster.sharding.ShardRegion.ClusterShardingStats + import scala.concurrent.duration._ + + val replyTo: ActorRef[ClusterShardingStats] = replyMessageAdapter + val timeout: FiniteDuration = 5.seconds + + ClusterSharding(system).shardState ! GetClusterShardingStats(Counter.TypeKey, timeout, replyTo) + //#get-cluster-sharding-stats + } + } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 54cf382c8e..42be1906a1 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -223,6 +223,7 @@ object ShardRegion { /** * Send this message to the `ShardRegion` actor to request for [[CurrentRegions]], * which contains the addresses of all registered regions. + * * Intended for testing purpose to see when cluster sharding is "ready" or to monitor * the state of the shard regions. */ @@ -281,6 +282,7 @@ object ShardRegion { * Send this message to the `ShardRegion` actor to request for [[ShardRegionStats]], * which contains statistics about the currently running sharded entities in the * entire region. + * * Intended for testing purpose to see when cluster sharding is "ready" or to monitor * the state of the shard regions. * diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala index 8d2d1fbfa2..25ccdbd0d5 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala @@ -6,11 +6,12 @@ package akka.cluster.sharding import scala.concurrent.duration._ -import com.typesafe.config.ConfigFactory - import akka.actor._ -import akka.cluster.{ Cluster, MemberStatus } -import akka.testkit.{ TestDuration, TestProbe } +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.testkit.TestDuration +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory object ClusterShardingGetStatsSpec { import MultiNodeClusterShardingSpec.PingPongActor diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 3e1aa3e4d1..655a1e6fb8 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -374,6 +374,36 @@ rebalanced to other nodes. See @ref:[How To Startup when Cluster Size Reached](cluster.md#how-to-startup-when-a-cluster-size-is-reached) for more information about `min-nr-of-members`. +## Inspecting cluster sharding state + +Two requests to inspect the cluster state are available: + +@apidoc[akka.cluster.sharding.typed.GetShardRegionState] which will reply with a +@apidoc[ShardRegion.CurrentShardRegionState] that contains the identifiers of the shards running in +a Region and what entities are alive for each of them. + +Scala +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #get-shard-region-state } + +Java +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #get-shard-region-state } + +@apidoc[akka.cluster.sharding.typed.GetClusterShardingStats] which will query all the regions in the cluster and reply with a +@apidoc[ShardRegion.ClusterShardingStats] containing the identifiers of the shards running in each region and a count +of entities that are alive in each shard. + +Scala +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #get-cluster-sharding-stats } + +Java +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #get-cluster-sharding-stats } + +If any shard queries failed, for example due to timeout if a shard was too busy to reply within the configured `akka.cluster.sharding.shard-region-query-timeout`, +`ShardRegion.CurrentShardRegionState` and `ShardRegion.ClusterShardingStats` will also include the set of shard identifiers by region that failed. + +The purpose of these messages is testing and monitoring, they are not provided to give access to +directly sending messages to the individual entities. + ## Lease A @ref[lease](../coordination.md) can be used as an additional safety measure to ensure a shard