diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala index 576db38c9e..2b5037d868 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala @@ -4,13 +4,14 @@ package akka.cluster.sharding.typed +import scala.concurrent.duration.FiniteDuration + import akka.actor.typed.ActorRef -import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, CurrentShardRegionState } +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.sharding.ShardRegion.CurrentShardRegionState import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.util.JavaDurationConverters -import scala.concurrent.duration.FiniteDuration - /** * Protocol for querying sharding state e.g. A ShardRegion's state */ @@ -19,6 +20,11 @@ sealed trait ClusterShardingQuery /** * Query the ShardRegion state for the given entity type key. This will get the state of the * local ShardRegion's state. + * + * Intended for testing purpose to see when cluster sharding is "ready" or to monitor + * the state of the shard regions. + * + * For the statistics for the entire cluster, see [[GetClusterShardingStats]]. */ final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: ActorRef[CurrentShardRegionState]) extends ClusterShardingQuery { @@ -38,10 +44,16 @@ final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: A * entire cluster. If the given `timeout` is reached without answers from all * shard regions the reply will contain an empty map of regions. * + * Intended for testing purpose to see when cluster sharding is "ready" or to monitor + * the state of the shard regions. + * * @param timeout the timeout applied to querying all alive regions * @param replyTo the actor to send the result to */ -final case class GetClusterShardingStats(timeout: FiniteDuration, replyTo: ActorRef[ClusterShardingStats]) +final case class GetClusterShardingStats( + entityTypeKey: EntityTypeKey[_], + timeout: FiniteDuration, + replyTo: ActorRef[ClusterShardingStats]) extends ClusterShardingQuery { /** @@ -51,6 +63,9 @@ final case class GetClusterShardingStats(timeout: FiniteDuration, replyTo: Actor * entire cluster. If the given `timeout` is reached without answers from all * shard regions the reply will contain an empty map of regions. */ - def this(timeout: java.time.Duration, replyTo: ActorRef[ClusterShardingStats]) = - this(JavaDurationConverters.asFiniteDuration(timeout), replyTo) + def this( + entityTypeKey: javadsl.EntityTypeKey[_], + timeout: java.time.Duration, + replyTo: ActorRef[ClusterShardingStats]) = + this(entityTypeKey.asScala, JavaDurationConverters.asFiniteDuration(timeout), replyTo) } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala index eec7b4f7ee..ec74d2b7c9 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala @@ -4,61 +4,63 @@ package akka.cluster.sharding.typed.internal -import scala.concurrent.Future - import akka.actor.typed.Behavior +import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.annotation.InternalApi +import akka.cluster.sharding.ClusterSharding +import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion.CurrentShardRegionState import akka.cluster.sharding.typed.ClusterShardingQuery import akka.cluster.sharding.typed.GetClusterShardingStats import akka.cluster.sharding.typed.GetShardRegionState -import akka.cluster.sharding.ClusterSharding -import akka.cluster.sharding.ShardRegion -import akka.pattern.AskTimeoutException -import akka.util.Timeout /** * INTERNAL API */ @InternalApi private[akka] object ShardingState { - def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = - Behaviors.setup { context => - Behaviors.receiveMessage { - case GetShardRegionState(key, replyTo) => - if (classicSharding.getShardTypeNames.contains(key.name)) { - classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic) - } else { - replyTo ! CurrentShardRegionState(Set.empty) - } - Behaviors.same - - case GetClusterShardingStats(timeout, replyTo) => - import akka.pattern.ask - import akka.pattern.pipe - implicit val t: Timeout = timeout - implicit val ec = context.system.executionContext - - val regions = classicSharding.shardTypeNames.map(classicSharding.shardRegion) - - if (regions.nonEmpty) { - Future - .firstCompletedOf(regions.map { region => - (region ? ShardRegion.GetClusterShardingStats(timeout)).mapTo[ShardRegion.ClusterShardingStats] - }) - .recover { - case _: AskTimeoutException => - ShardRegion.ClusterShardingStats(Map.empty) + def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = { + Behaviors + .supervise[ClusterShardingQuery] { + Behaviors.setup { context => + Behaviors.receiveMessage { + case GetShardRegionState(key, replyTo) => + if (classicSharding.getShardTypeNames.contains(key.name)) { + try { + classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic) + } catch { + case e: IllegalStateException => + // classicSharding.shardRegion may throw if not initialized + context.log.warn(e.getMessage) + replyTo ! CurrentShardRegionState(Set.empty) + } + } else { + replyTo ! CurrentShardRegionState(Set.empty) } - .pipeTo(replyTo.toClassic) - } else { - replyTo ! ShardRegion.ClusterShardingStats(Map.empty) - } + Behaviors.same - Behaviors.same + case GetClusterShardingStats(key, timeout, replyTo) => + if (classicSharding.getShardTypeNames.contains(key.name)) { + try { + classicSharding + .shardRegion(key.name) + .tell(ShardRegion.GetClusterShardingStats(timeout), replyTo.toClassic) + } catch { + case e: IllegalStateException => + // classicSharding.shardRegion may throw if not initialized + context.log.warn(e.getMessage) + replyTo ! ShardRegion.ClusterShardingStats(Map.empty) + } + } else { + replyTo ! ShardRegion.ClusterShardingStats(Map.empty) + } + Behaviors.same + } + } } - } + .onFailure(SupervisorStrategy.restart) + } } diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala index 7f55998e1d..d441b29a08 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala @@ -4,18 +4,23 @@ package akka.cluster.sharding.typed -import akka.actor.testkit.typed.scaladsl.TestProbe -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ ActorRef, Behavior } -import akka.cluster.MultiNodeClusterSpec -import akka.cluster.sharding.ShardRegion.ClusterShardingStats -import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity, EntityTypeKey } -import akka.cluster.typed.MultiNodeTypedClusterSpec -import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } -import akka.serialization.jackson.CborSerializable import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.ScalaFutures +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.MultiNodeClusterSpec +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.Entity +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.typed.MultiNodeTypedClusterSpec +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.serialization.jackson.CborSerializable + object ClusterShardingStatsSpecConfig extends MultiNodeConfig { val first = role("first") @@ -57,15 +62,11 @@ abstract class ClusterShardingStatsSpec import ClusterShardingStatsSpecConfig._ import Pinger._ - val typeKey = EntityTypeKey[Command]("ping") + private val typeKey = EntityTypeKey[Command]("ping") - val entityId = "ping-1" - - val sharding = ClusterSharding(typedSystem) - - val settings = ClusterShardingSettings(typedSystem) - - val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all + private val sharding = ClusterSharding(typedSystem) + private val settings = ClusterShardingSettings(typedSystem) + private val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all "Cluster sharding stats" must { "form cluster" in { @@ -73,25 +74,30 @@ abstract class ClusterShardingStatsSpec } "get shard stats" in { - sharding.init(Entity(typeKey)(_ => Pinger())) enterBarrier("sharding started") - import akka.actor.typed.scaladsl.adapter._ - val entityRef = ClusterSharding(system.toTyped).entityRefFor(typeKey, entityId) - val pongProbe = TestProbe[Pong] + runOn(first) { + val pongProbe = TestProbe[Pong]() - entityRef ! Ping(1, pongProbe.ref) - pongProbe.expectMessageType[Pong] + val entityRef1 = ClusterSharding(typedSystem).entityRefFor(typeKey, "ping-1") + entityRef1 ! Ping(1, pongProbe.ref) + pongProbe.receiveMessage() + + val entityRef2 = ClusterSharding(typedSystem).entityRefFor(typeKey, "ping-2") + entityRef2 ! Ping(2, pongProbe.ref) + pongProbe.receiveMessage() + } enterBarrier("sharding-initialized") runOn(first, second, third) { val replyToProbe = TestProbe[ClusterShardingStats]() - sharding.shardState ! GetClusterShardingStats(queryTimeout, replyToProbe.ref) + sharding.shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyToProbe.ref) - val stats = replyToProbe.expectMessageType[ClusterShardingStats](queryTimeout) + val stats = replyToProbe.receiveMessage(queryTimeout) stats.regions.size shouldEqual 3 - stats.regions.values.flatMap(_.stats.values).sum shouldEqual 1 + stats.regions.values.flatMap(_.stats.values).sum shouldEqual 2 + stats.regions.values.forall(_.failed.isEmpty) shouldBe true } enterBarrier("done") diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java index bd72ed5879..450e56db6b 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java @@ -24,6 +24,17 @@ import akka.persistence.typed.PersistenceId; // #import +// #get-shard-region-state +import akka.cluster.sharding.typed.GetShardRegionState; +import akka.cluster.sharding.ShardRegion.CurrentShardRegionState; + +// #get-shard-region-state +// #get-cluster-sharding-stats +import akka.cluster.sharding.typed.GetClusterShardingStats; +import akka.cluster.sharding.ShardRegion.ClusterShardingStats; + +// #get-cluster-sharding-stats + import jdocs.akka.persistence.typed.BlogPostEntity; interface ShardingCompileOnlyTest { @@ -248,4 +259,31 @@ interface ShardingCompileOnlyTest { ClusterSharding.get(system).entityRefFor(typeKey, entityId, "dc2"); // #proxy-dc-entityref } + + public static void shardRegionQqueryExample() { + ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample"); + ActorRef replyMessageAdapter = null; + EntityTypeKey typeKey = EntityTypeKey.create(Counter.Command.class, "Counter"); + + // #get-shard-region-state + ActorRef replyTo = replyMessageAdapter; + + ClusterSharding.get(system).shardState().tell(new GetShardRegionState(typeKey, replyTo)); + // #get-shard-region-state + } + + public static void shardingStatsQqueryExample() { + ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample"); + ActorRef replyMessageAdapter = null; + EntityTypeKey typeKey = EntityTypeKey.create(Counter.Command.class, "Counter"); + + // #get-cluster-sharding-stats + ActorRef replyTo = replyMessageAdapter; + Duration timeout = Duration.ofSeconds(5); + + ClusterSharding.get(system) + .shardState() + .tell(new GetClusterShardingStats(typeKey, timeout, replyTo)); + // #get-cluster-sharding-stats + } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala index 558a3413c3..f47ccdd52c 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala @@ -19,19 +19,19 @@ class ClusterShardingStateSpec with AnyWordSpecLike with LogCapturing { - val sharding = ClusterSharding(system) + private val sharding = ClusterSharding(system) - val shardExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { + private val shardExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { case IdReplyPlz(id, _) => id case IdWhoAreYou(id, _) => id case other => throw new IllegalArgumentException(s"Unexpected message $other") } - val cluster = Cluster(system) + private val cluster = Cluster(system) val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes - "Cluster Sharding" must { + "Cluster Sharding CurrentShardRegionState query" must { "allow querying of the shard region state" in { val probe = TestProbe[CurrentShardRegionState]() cluster.manager ! Join(cluster.selfMember.address) diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala index c971828a23..628d2f7a54 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala @@ -4,67 +4,71 @@ package akka.cluster.sharding.typed.scaladsl -import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit, TestProbe } -import akka.actor.typed.ActorRef -import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, ShardRegionStats } -import akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec._ -import akka.cluster.sharding.typed.{ ClusterShardingSettings, GetClusterShardingStats } -import akka.cluster.typed.{ Cluster, Join, SelfUp } import org.scalatest.wordspec.AnyWordSpecLike +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.sharding.ShardRegion.ShardRegionStats +import akka.cluster.sharding.typed.ClusterShardingSettings +import akka.cluster.sharding.typed.GetClusterShardingStats +import akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec._ +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import akka.cluster.typed.SelfUp + class ClusterShardingStatsSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) with AnyWordSpecLike with LogCapturing { - val sharding = ClusterSharding(system) + private val sharding = ClusterSharding(system) - val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes + private val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes - val shardExtractor = ClusterShardingSpec.idTestProtocolMessageExtractor + private val shardExtractor = ClusterShardingSpec.idTestProtocolMessageExtractor // no need to scale this up here for the cluster query versus one region - val queryTimeout = ClusterShardingSettings(system).shardRegionQueryTimeout + private val queryTimeout = ClusterShardingSettings(system).shardRegionQueryTimeout - "Cluster Sharding" must { + "Cluster Sharding ClusterShardingStats query" must { "return empty statistics if there are no running sharded entities" in { val cluster = Cluster(system) - val upProbe = TestProbe[SelfUp]() + val upProbe = createTestProbe[SelfUp]() cluster.subscriptions ! akka.cluster.typed.Subscribe(upProbe.ref, classOf[SelfUp]) cluster.manager ! Join(cluster.selfMember.address) upProbe.expectMessageType[SelfUp] - val emptyProbe = TestProbe[ClusterShardingStats]() - sharding.shardState ! GetClusterShardingStats(queryTimeout, emptyProbe.ref) - emptyProbe.expectMessage(ClusterShardingStats(Map.empty)) + val replyProbe = createTestProbe[ClusterShardingStats]() + sharding.shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyProbe.ref) + replyProbe.expectMessage(ClusterShardingStats(Map.empty)) } "allow querying of statistics of the currently running sharded entities in the entire cluster" in { - val shardingRef: ActorRef[IdTestProtocol] = sharding.init( Entity(typeKey)(_ => ClusterShardingSpec.behaviorWithId()) .withStopMessage(IdStopPlz()) .withMessageExtractor(idTestProtocolMessageExtractor)) - val replyProbe = TestProbe[String]() + val replyProbe = createTestProbe[String]() val id1 = "id1" shardingRef ! IdReplyPlz(id1, replyProbe.ref) replyProbe.expectMessage("Hello!") - val replyToProbe = TestProbe[ClusterShardingStats]() + val replyToProbe = createTestProbe[ClusterShardingStats]() val replyTo = replyToProbe.ref - //#get-cluster-sharding-stats - ClusterSharding(system).shardState ! GetClusterShardingStats(queryTimeout, replyTo) - val stats = replyToProbe.expectMessageType[ClusterShardingStats] - //#get-cluster-sharding-stats + ClusterSharding(system).shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyTo) + val stats = replyToProbe.receiveMessage() val expect = ClusterShardingStats( Map(Cluster(system).selfMember.address -> ShardRegionStats(Map(shardExtractor.shardId(id1) -> 1), Set.empty))) stats shouldEqual expect } + } } diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala index c6c8e8e52e..7af737140e 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -14,6 +14,9 @@ import com.github.ghik.silencer.silent import docs.akka.persistence.typed.BlogPostEntity import docs.akka.persistence.typed.BlogPostEntity.Command +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey + @silent object ShardingCompileOnlySpec { @@ -169,4 +172,42 @@ object ShardingCompileOnlySpec { //#sharded-response } + object ShardRegionStateQuery { + + object Counter { + val TypeKey = EntityTypeKey[Basics.Counter.Command]("Counter") + } + + val replyMessageAdapter: ActorRef[akka.cluster.sharding.ShardRegion.CurrentShardRegionState] = ??? + + //#get-shard-region-state + import akka.cluster.sharding.typed.GetShardRegionState + import akka.cluster.sharding.ShardRegion.CurrentShardRegionState + + val replyTo: ActorRef[CurrentShardRegionState] = replyMessageAdapter + + ClusterSharding(system).shardState ! GetShardRegionState(Counter.TypeKey, replyTo) + //#get-shard-region-state + } + + object ClusterShardingStatsQuery { + + object Counter { + val TypeKey = EntityTypeKey[Basics.Counter.Command]("Counter") + } + + val replyMessageAdapter: ActorRef[akka.cluster.sharding.ShardRegion.ClusterShardingStats] = ??? + + //#get-cluster-sharding-stats + import akka.cluster.sharding.typed.GetClusterShardingStats + import akka.cluster.sharding.ShardRegion.ClusterShardingStats + import scala.concurrent.duration._ + + val replyTo: ActorRef[ClusterShardingStats] = replyMessageAdapter + val timeout: FiniteDuration = 5.seconds + + ClusterSharding(system).shardState ! GetClusterShardingStats(Counter.TypeKey, timeout, replyTo) + //#get-cluster-sharding-stats + } + } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 54cf382c8e..42be1906a1 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -223,6 +223,7 @@ object ShardRegion { /** * Send this message to the `ShardRegion` actor to request for [[CurrentRegions]], * which contains the addresses of all registered regions. + * * Intended for testing purpose to see when cluster sharding is "ready" or to monitor * the state of the shard regions. */ @@ -281,6 +282,7 @@ object ShardRegion { * Send this message to the `ShardRegion` actor to request for [[ShardRegionStats]], * which contains statistics about the currently running sharded entities in the * entire region. + * * Intended for testing purpose to see when cluster sharding is "ready" or to monitor * the state of the shard regions. * diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala index 8d2d1fbfa2..25ccdbd0d5 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala @@ -6,11 +6,12 @@ package akka.cluster.sharding import scala.concurrent.duration._ -import com.typesafe.config.ConfigFactory - import akka.actor._ -import akka.cluster.{ Cluster, MemberStatus } -import akka.testkit.{ TestDuration, TestProbe } +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.testkit.TestDuration +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory object ClusterShardingGetStatsSpec { import MultiNodeClusterShardingSpec.PingPongActor diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 3e1aa3e4d1..655a1e6fb8 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -374,6 +374,36 @@ rebalanced to other nodes. See @ref:[How To Startup when Cluster Size Reached](cluster.md#how-to-startup-when-a-cluster-size-is-reached) for more information about `min-nr-of-members`. +## Inspecting cluster sharding state + +Two requests to inspect the cluster state are available: + +@apidoc[akka.cluster.sharding.typed.GetShardRegionState] which will reply with a +@apidoc[ShardRegion.CurrentShardRegionState] that contains the identifiers of the shards running in +a Region and what entities are alive for each of them. + +Scala +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #get-shard-region-state } + +Java +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #get-shard-region-state } + +@apidoc[akka.cluster.sharding.typed.GetClusterShardingStats] which will query all the regions in the cluster and reply with a +@apidoc[ShardRegion.ClusterShardingStats] containing the identifiers of the shards running in each region and a count +of entities that are alive in each shard. + +Scala +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #get-cluster-sharding-stats } + +Java +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #get-cluster-sharding-stats } + +If any shard queries failed, for example due to timeout if a shard was too busy to reply within the configured `akka.cluster.sharding.shard-region-query-timeout`, +`ShardRegion.CurrentShardRegionState` and `ShardRegion.ClusterShardingStats` will also include the set of shard identifiers by region that failed. + +The purpose of these messages is testing and monitoring, they are not provided to give access to +directly sending messages to the individual entities. + ## Lease A @ref[lease](../coordination.md) can be used as an additional safety measure to ensure a shard