From c0e31978e61629ab5b6ba42ddb4c309eb7f95004 Mon Sep 17 00:00:00 2001 From: Helena Edelson Date: Fri, 28 Feb 2020 15:10:47 -0800 Subject: [PATCH 1/2] Support GetClusterShardingStats query in Typed #24466 --- .../sharding/typed/ClusterShardingQuery.scala | 27 ++++- .../typed/internal/ShardingState.scala | 55 ++++++++-- .../typed/ClusterShardingStatsSpec.scala | 102 ++++++++++++++++++ .../scaladsl/ClusterShardingStatsSpec.scala | 70 ++++++++++++ 4 files changed, 242 insertions(+), 12 deletions(-) create mode 100644 akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala create mode 100644 akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala index 29a016eaff..576db38c9e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala @@ -5,8 +5,11 @@ package akka.cluster.sharding.typed import akka.actor.typed.ActorRef -import akka.cluster.sharding.ShardRegion.CurrentShardRegionState +import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, CurrentShardRegionState } import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.util.JavaDurationConverters + +import scala.concurrent.duration.FiniteDuration /** * Protocol for querying sharding state e.g. A ShardRegion's state @@ -30,4 +33,24 @@ final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: A this(entityTypeKey.asScala, replyTo) } -// TODO - GetClusterShardingStats +/** + * Query the statistics about the currently running sharded entities in the + * entire cluster. If the given `timeout` is reached without answers from all + * shard regions the reply will contain an empty map of regions. + * + * @param timeout the timeout applied to querying all alive regions + * @param replyTo the actor to send the result to + */ +final case class GetClusterShardingStats(timeout: FiniteDuration, replyTo: ActorRef[ClusterShardingStats]) + extends ClusterShardingQuery { + + /** + * Java API + * + * Query the statistics about the currently running sharded entities in the + * entire cluster. If the given `timeout` is reached without answers from all + * shard regions the reply will contain an empty map of regions. + */ + def this(timeout: java.time.Duration, replyTo: ActorRef[ClusterShardingStats]) = + this(JavaDurationConverters.asFiniteDuration(timeout), replyTo) +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala index 1f08f0c1c1..eec7b4f7ee 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala @@ -3,27 +3,62 @@ */ package akka.cluster.sharding.typed.internal + +import scala.concurrent.Future + import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.annotation.InternalApi -import akka.cluster.sharding.{ ClusterSharding, ShardRegion } import akka.cluster.sharding.ShardRegion.CurrentShardRegionState -import akka.cluster.sharding.typed.{ ClusterShardingQuery, GetShardRegionState } +import akka.cluster.sharding.typed.ClusterShardingQuery +import akka.cluster.sharding.typed.GetClusterShardingStats +import akka.cluster.sharding.typed.GetShardRegionState +import akka.cluster.sharding.ClusterSharding +import akka.cluster.sharding.ShardRegion +import akka.pattern.AskTimeoutException +import akka.util.Timeout /** * INTERNAL API */ @InternalApi private[akka] object ShardingState { - def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = Behaviors.receiveMessage { - case GetShardRegionState(key, replyTo) => - if (classicSharding.getShardTypeNames.contains(key.name)) { - classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic) - } else { - replyTo ! CurrentShardRegionState(Set.empty) + def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = + Behaviors.setup { context => + Behaviors.receiveMessage { + case GetShardRegionState(key, replyTo) => + if (classicSharding.getShardTypeNames.contains(key.name)) { + classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic) + } else { + replyTo ! CurrentShardRegionState(Set.empty) + } + Behaviors.same + + case GetClusterShardingStats(timeout, replyTo) => + import akka.pattern.ask + import akka.pattern.pipe + implicit val t: Timeout = timeout + implicit val ec = context.system.executionContext + + val regions = classicSharding.shardTypeNames.map(classicSharding.shardRegion) + + if (regions.nonEmpty) { + Future + .firstCompletedOf(regions.map { region => + (region ? ShardRegion.GetClusterShardingStats(timeout)).mapTo[ShardRegion.ClusterShardingStats] + }) + .recover { + case _: AskTimeoutException => + ShardRegion.ClusterShardingStats(Map.empty) + } + .pipeTo(replyTo.toClassic) + } else { + replyTo ! ShardRegion.ClusterShardingStats(Map.empty) + } + + Behaviors.same } - Behaviors.same - } + } } diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala new file mode 100644 index 0000000000..7f55998e1d --- /dev/null +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed + +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ ActorRef, Behavior } +import akka.cluster.MultiNodeClusterSpec +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity, EntityTypeKey } +import akka.cluster.typed.MultiNodeTypedClusterSpec +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } +import akka.serialization.jackson.CborSerializable +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +object ClusterShardingStatsSpecConfig extends MultiNodeConfig { + + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(""" + akka.log-dead-letters-during-shutdown = off + akka.cluster.sharding.updating-state-timeout = 2s + akka.cluster.sharding.waiting-for-state-timeout = 2s + """).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class ClusterShardingStatsSpecMultiJvmNode1 extends ClusterShardingStatsSpec +class ClusterShardingStatsSpecMultiJvmNode2 extends ClusterShardingStatsSpec +class ClusterShardingStatsSpecMultiJvmNode3 extends ClusterShardingStatsSpec + +object Pinger { + sealed trait Command extends CborSerializable + case class Ping(id: Int, ref: ActorRef[Pong]) extends Command + case class Pong(id: Int) extends CborSerializable + + def apply(): Behavior[Command] = { + Behaviors.receiveMessage[Command] { + case Ping(id: Int, ref) => + ref ! Pong(id) + Behaviors.same + } + } + +} + +abstract class ClusterShardingStatsSpec + extends MultiNodeSpec(ClusterShardingStatsSpecConfig) + with MultiNodeTypedClusterSpec + with ScalaFutures { + + import ClusterShardingStatsSpecConfig._ + import Pinger._ + + val typeKey = EntityTypeKey[Command]("ping") + + val entityId = "ping-1" + + val sharding = ClusterSharding(typedSystem) + + val settings = ClusterShardingSettings(typedSystem) + + val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all + + "Cluster sharding stats" must { + "form cluster" in { + formCluster(first, second, third) + } + + "get shard stats" in { + + sharding.init(Entity(typeKey)(_ => Pinger())) + enterBarrier("sharding started") + + import akka.actor.typed.scaladsl.adapter._ + val entityRef = ClusterSharding(system.toTyped).entityRefFor(typeKey, entityId) + val pongProbe = TestProbe[Pong] + + entityRef ! Ping(1, pongProbe.ref) + pongProbe.expectMessageType[Pong] + enterBarrier("sharding-initialized") + + runOn(first, second, third) { + val replyToProbe = TestProbe[ClusterShardingStats]() + sharding.shardState ! GetClusterShardingStats(queryTimeout, replyToProbe.ref) + + val stats = replyToProbe.expectMessageType[ClusterShardingStats](queryTimeout) + stats.regions.size shouldEqual 3 + stats.regions.values.flatMap(_.stats.values).sum shouldEqual 1 + } + enterBarrier("done") + + } + + } + +} diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala new file mode 100644 index 0000000000..c971828a23 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.scaladsl + +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit, TestProbe } +import akka.actor.typed.ActorRef +import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, ShardRegionStats } +import akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec._ +import akka.cluster.sharding.typed.{ ClusterShardingSettings, GetClusterShardingStats } +import akka.cluster.typed.{ Cluster, Join, SelfUp } +import org.scalatest.wordspec.AnyWordSpecLike + +class ClusterShardingStatsSpec + extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) + with AnyWordSpecLike + with LogCapturing { + + val sharding = ClusterSharding(system) + + val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes + + val shardExtractor = ClusterShardingSpec.idTestProtocolMessageExtractor + + // no need to scale this up here for the cluster query versus one region + val queryTimeout = ClusterShardingSettings(system).shardRegionQueryTimeout + + "Cluster Sharding" must { + "return empty statistics if there are no running sharded entities" in { + val cluster = Cluster(system) + val upProbe = TestProbe[SelfUp]() + + cluster.subscriptions ! akka.cluster.typed.Subscribe(upProbe.ref, classOf[SelfUp]) + cluster.manager ! Join(cluster.selfMember.address) + upProbe.expectMessageType[SelfUp] + + val emptyProbe = TestProbe[ClusterShardingStats]() + sharding.shardState ! GetClusterShardingStats(queryTimeout, emptyProbe.ref) + emptyProbe.expectMessage(ClusterShardingStats(Map.empty)) + } + + "allow querying of statistics of the currently running sharded entities in the entire cluster" in { + + val shardingRef: ActorRef[IdTestProtocol] = sharding.init( + Entity(typeKey)(_ => ClusterShardingSpec.behaviorWithId()) + .withStopMessage(IdStopPlz()) + .withMessageExtractor(idTestProtocolMessageExtractor)) + + val replyProbe = TestProbe[String]() + val id1 = "id1" + shardingRef ! IdReplyPlz(id1, replyProbe.ref) + replyProbe.expectMessage("Hello!") + + val replyToProbe = TestProbe[ClusterShardingStats]() + val replyTo = replyToProbe.ref + + //#get-cluster-sharding-stats + ClusterSharding(system).shardState ! GetClusterShardingStats(queryTimeout, replyTo) + val stats = replyToProbe.expectMessageType[ClusterShardingStats] + //#get-cluster-sharding-stats + + val expect = ClusterShardingStats( + Map(Cluster(system).selfMember.address -> ShardRegionStats(Map(shardExtractor.shardId(id1) -> 1), Set.empty))) + + stats shouldEqual expect + } + } + +} From e01396843eb07de0a82ac2d0593b0e8156b2fff6 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 28 Apr 2020 19:21:01 +0200 Subject: [PATCH 2/2] Complete GetClusterShardingStats query, #24466 --- .../sharding/typed/ClusterShardingQuery.scala | 27 +++++-- .../typed/internal/ShardingState.scala | 80 ++++++++++--------- .../typed/ClusterShardingStatsSpec.scala | 58 ++++++++------ .../typed/ShardingCompileOnlyTest.java | 38 +++++++++ .../scaladsl/ClusterShardingStateSpec.scala | 8 +- .../scaladsl/ClusterShardingStatsSpec.scala | 48 ++++++----- .../typed/ShardingCompileOnlySpec.scala | 41 ++++++++++ .../akka/cluster/sharding/ShardRegion.scala | 2 + .../ClusterShardingGetStatsSpec.scala | 9 ++- .../main/paradox/typed/cluster-sharding.md | 30 +++++++ 10 files changed, 240 insertions(+), 101 deletions(-) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala index 576db38c9e..2b5037d868 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala @@ -4,13 +4,14 @@ package akka.cluster.sharding.typed +import scala.concurrent.duration.FiniteDuration + import akka.actor.typed.ActorRef -import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, CurrentShardRegionState } +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.sharding.ShardRegion.CurrentShardRegionState import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.util.JavaDurationConverters -import scala.concurrent.duration.FiniteDuration - /** * Protocol for querying sharding state e.g. A ShardRegion's state */ @@ -19,6 +20,11 @@ sealed trait ClusterShardingQuery /** * Query the ShardRegion state for the given entity type key. This will get the state of the * local ShardRegion's state. + * + * Intended for testing purpose to see when cluster sharding is "ready" or to monitor + * the state of the shard regions. + * + * For the statistics for the entire cluster, see [[GetClusterShardingStats]]. */ final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: ActorRef[CurrentShardRegionState]) extends ClusterShardingQuery { @@ -38,10 +44,16 @@ final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: A * entire cluster. If the given `timeout` is reached without answers from all * shard regions the reply will contain an empty map of regions. * + * Intended for testing purpose to see when cluster sharding is "ready" or to monitor + * the state of the shard regions. + * * @param timeout the timeout applied to querying all alive regions * @param replyTo the actor to send the result to */ -final case class GetClusterShardingStats(timeout: FiniteDuration, replyTo: ActorRef[ClusterShardingStats]) +final case class GetClusterShardingStats( + entityTypeKey: EntityTypeKey[_], + timeout: FiniteDuration, + replyTo: ActorRef[ClusterShardingStats]) extends ClusterShardingQuery { /** @@ -51,6 +63,9 @@ final case class GetClusterShardingStats(timeout: FiniteDuration, replyTo: Actor * entire cluster. If the given `timeout` is reached without answers from all * shard regions the reply will contain an empty map of regions. */ - def this(timeout: java.time.Duration, replyTo: ActorRef[ClusterShardingStats]) = - this(JavaDurationConverters.asFiniteDuration(timeout), replyTo) + def this( + entityTypeKey: javadsl.EntityTypeKey[_], + timeout: java.time.Duration, + replyTo: ActorRef[ClusterShardingStats]) = + this(entityTypeKey.asScala, JavaDurationConverters.asFiniteDuration(timeout), replyTo) } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala index eec7b4f7ee..ec74d2b7c9 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala @@ -4,61 +4,63 @@ package akka.cluster.sharding.typed.internal -import scala.concurrent.Future - import akka.actor.typed.Behavior +import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.annotation.InternalApi +import akka.cluster.sharding.ClusterSharding +import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion.CurrentShardRegionState import akka.cluster.sharding.typed.ClusterShardingQuery import akka.cluster.sharding.typed.GetClusterShardingStats import akka.cluster.sharding.typed.GetShardRegionState -import akka.cluster.sharding.ClusterSharding -import akka.cluster.sharding.ShardRegion -import akka.pattern.AskTimeoutException -import akka.util.Timeout /** * INTERNAL API */ @InternalApi private[akka] object ShardingState { - def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = - Behaviors.setup { context => - Behaviors.receiveMessage { - case GetShardRegionState(key, replyTo) => - if (classicSharding.getShardTypeNames.contains(key.name)) { - classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic) - } else { - replyTo ! CurrentShardRegionState(Set.empty) - } - Behaviors.same - - case GetClusterShardingStats(timeout, replyTo) => - import akka.pattern.ask - import akka.pattern.pipe - implicit val t: Timeout = timeout - implicit val ec = context.system.executionContext - - val regions = classicSharding.shardTypeNames.map(classicSharding.shardRegion) - - if (regions.nonEmpty) { - Future - .firstCompletedOf(regions.map { region => - (region ? ShardRegion.GetClusterShardingStats(timeout)).mapTo[ShardRegion.ClusterShardingStats] - }) - .recover { - case _: AskTimeoutException => - ShardRegion.ClusterShardingStats(Map.empty) + def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = { + Behaviors + .supervise[ClusterShardingQuery] { + Behaviors.setup { context => + Behaviors.receiveMessage { + case GetShardRegionState(key, replyTo) => + if (classicSharding.getShardTypeNames.contains(key.name)) { + try { + classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic) + } catch { + case e: IllegalStateException => + // classicSharding.shardRegion may throw if not initialized + context.log.warn(e.getMessage) + replyTo ! CurrentShardRegionState(Set.empty) + } + } else { + replyTo ! CurrentShardRegionState(Set.empty) } - .pipeTo(replyTo.toClassic) - } else { - replyTo ! ShardRegion.ClusterShardingStats(Map.empty) - } + Behaviors.same - Behaviors.same + case GetClusterShardingStats(key, timeout, replyTo) => + if (classicSharding.getShardTypeNames.contains(key.name)) { + try { + classicSharding + .shardRegion(key.name) + .tell(ShardRegion.GetClusterShardingStats(timeout), replyTo.toClassic) + } catch { + case e: IllegalStateException => + // classicSharding.shardRegion may throw if not initialized + context.log.warn(e.getMessage) + replyTo ! ShardRegion.ClusterShardingStats(Map.empty) + } + } else { + replyTo ! ShardRegion.ClusterShardingStats(Map.empty) + } + Behaviors.same + } + } } - } + .onFailure(SupervisorStrategy.restart) + } } diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala index 7f55998e1d..d441b29a08 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala @@ -4,18 +4,23 @@ package akka.cluster.sharding.typed -import akka.actor.testkit.typed.scaladsl.TestProbe -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ ActorRef, Behavior } -import akka.cluster.MultiNodeClusterSpec -import akka.cluster.sharding.ShardRegion.ClusterShardingStats -import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity, EntityTypeKey } -import akka.cluster.typed.MultiNodeTypedClusterSpec -import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } -import akka.serialization.jackson.CborSerializable import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.ScalaFutures +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.MultiNodeClusterSpec +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.Entity +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.typed.MultiNodeTypedClusterSpec +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.serialization.jackson.CborSerializable + object ClusterShardingStatsSpecConfig extends MultiNodeConfig { val first = role("first") @@ -57,15 +62,11 @@ abstract class ClusterShardingStatsSpec import ClusterShardingStatsSpecConfig._ import Pinger._ - val typeKey = EntityTypeKey[Command]("ping") + private val typeKey = EntityTypeKey[Command]("ping") - val entityId = "ping-1" - - val sharding = ClusterSharding(typedSystem) - - val settings = ClusterShardingSettings(typedSystem) - - val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all + private val sharding = ClusterSharding(typedSystem) + private val settings = ClusterShardingSettings(typedSystem) + private val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all "Cluster sharding stats" must { "form cluster" in { @@ -73,25 +74,30 @@ abstract class ClusterShardingStatsSpec } "get shard stats" in { - sharding.init(Entity(typeKey)(_ => Pinger())) enterBarrier("sharding started") - import akka.actor.typed.scaladsl.adapter._ - val entityRef = ClusterSharding(system.toTyped).entityRefFor(typeKey, entityId) - val pongProbe = TestProbe[Pong] + runOn(first) { + val pongProbe = TestProbe[Pong]() - entityRef ! Ping(1, pongProbe.ref) - pongProbe.expectMessageType[Pong] + val entityRef1 = ClusterSharding(typedSystem).entityRefFor(typeKey, "ping-1") + entityRef1 ! Ping(1, pongProbe.ref) + pongProbe.receiveMessage() + + val entityRef2 = ClusterSharding(typedSystem).entityRefFor(typeKey, "ping-2") + entityRef2 ! Ping(2, pongProbe.ref) + pongProbe.receiveMessage() + } enterBarrier("sharding-initialized") runOn(first, second, third) { val replyToProbe = TestProbe[ClusterShardingStats]() - sharding.shardState ! GetClusterShardingStats(queryTimeout, replyToProbe.ref) + sharding.shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyToProbe.ref) - val stats = replyToProbe.expectMessageType[ClusterShardingStats](queryTimeout) + val stats = replyToProbe.receiveMessage(queryTimeout) stats.regions.size shouldEqual 3 - stats.regions.values.flatMap(_.stats.values).sum shouldEqual 1 + stats.regions.values.flatMap(_.stats.values).sum shouldEqual 2 + stats.regions.values.forall(_.failed.isEmpty) shouldBe true } enterBarrier("done") diff --git a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java index bd72ed5879..450e56db6b 100644 --- a/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java +++ b/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java @@ -24,6 +24,17 @@ import akka.persistence.typed.PersistenceId; // #import +// #get-shard-region-state +import akka.cluster.sharding.typed.GetShardRegionState; +import akka.cluster.sharding.ShardRegion.CurrentShardRegionState; + +// #get-shard-region-state +// #get-cluster-sharding-stats +import akka.cluster.sharding.typed.GetClusterShardingStats; +import akka.cluster.sharding.ShardRegion.ClusterShardingStats; + +// #get-cluster-sharding-stats + import jdocs.akka.persistence.typed.BlogPostEntity; interface ShardingCompileOnlyTest { @@ -248,4 +259,31 @@ interface ShardingCompileOnlyTest { ClusterSharding.get(system).entityRefFor(typeKey, entityId, "dc2"); // #proxy-dc-entityref } + + public static void shardRegionQqueryExample() { + ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample"); + ActorRef replyMessageAdapter = null; + EntityTypeKey typeKey = EntityTypeKey.create(Counter.Command.class, "Counter"); + + // #get-shard-region-state + ActorRef replyTo = replyMessageAdapter; + + ClusterSharding.get(system).shardState().tell(new GetShardRegionState(typeKey, replyTo)); + // #get-shard-region-state + } + + public static void shardingStatsQqueryExample() { + ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample"); + ActorRef replyMessageAdapter = null; + EntityTypeKey typeKey = EntityTypeKey.create(Counter.Command.class, "Counter"); + + // #get-cluster-sharding-stats + ActorRef replyTo = replyMessageAdapter; + Duration timeout = Duration.ofSeconds(5); + + ClusterSharding.get(system) + .shardState() + .tell(new GetClusterShardingStats(typeKey, timeout, replyTo)); + // #get-cluster-sharding-stats + } } diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala index 558a3413c3..f47ccdd52c 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStateSpec.scala @@ -19,19 +19,19 @@ class ClusterShardingStateSpec with AnyWordSpecLike with LogCapturing { - val sharding = ClusterSharding(system) + private val sharding = ClusterSharding(system) - val shardExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { + private val shardExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { case IdReplyPlz(id, _) => id case IdWhoAreYou(id, _) => id case other => throw new IllegalArgumentException(s"Unexpected message $other") } - val cluster = Cluster(system) + private val cluster = Cluster(system) val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes - "Cluster Sharding" must { + "Cluster Sharding CurrentShardRegionState query" must { "allow querying of the shard region state" in { val probe = TestProbe[CurrentShardRegionState]() cluster.manager ! Join(cluster.selfMember.address) diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala index c971828a23..628d2f7a54 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala @@ -4,67 +4,71 @@ package akka.cluster.sharding.typed.scaladsl -import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit, TestProbe } -import akka.actor.typed.ActorRef -import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, ShardRegionStats } -import akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec._ -import akka.cluster.sharding.typed.{ ClusterShardingSettings, GetClusterShardingStats } -import akka.cluster.typed.{ Cluster, Join, SelfUp } import org.scalatest.wordspec.AnyWordSpecLike +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.sharding.ShardRegion.ShardRegionStats +import akka.cluster.sharding.typed.ClusterShardingSettings +import akka.cluster.sharding.typed.GetClusterShardingStats +import akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec._ +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import akka.cluster.typed.SelfUp + class ClusterShardingStatsSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) with AnyWordSpecLike with LogCapturing { - val sharding = ClusterSharding(system) + private val sharding = ClusterSharding(system) - val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes + private val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes - val shardExtractor = ClusterShardingSpec.idTestProtocolMessageExtractor + private val shardExtractor = ClusterShardingSpec.idTestProtocolMessageExtractor // no need to scale this up here for the cluster query versus one region - val queryTimeout = ClusterShardingSettings(system).shardRegionQueryTimeout + private val queryTimeout = ClusterShardingSettings(system).shardRegionQueryTimeout - "Cluster Sharding" must { + "Cluster Sharding ClusterShardingStats query" must { "return empty statistics if there are no running sharded entities" in { val cluster = Cluster(system) - val upProbe = TestProbe[SelfUp]() + val upProbe = createTestProbe[SelfUp]() cluster.subscriptions ! akka.cluster.typed.Subscribe(upProbe.ref, classOf[SelfUp]) cluster.manager ! Join(cluster.selfMember.address) upProbe.expectMessageType[SelfUp] - val emptyProbe = TestProbe[ClusterShardingStats]() - sharding.shardState ! GetClusterShardingStats(queryTimeout, emptyProbe.ref) - emptyProbe.expectMessage(ClusterShardingStats(Map.empty)) + val replyProbe = createTestProbe[ClusterShardingStats]() + sharding.shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyProbe.ref) + replyProbe.expectMessage(ClusterShardingStats(Map.empty)) } "allow querying of statistics of the currently running sharded entities in the entire cluster" in { - val shardingRef: ActorRef[IdTestProtocol] = sharding.init( Entity(typeKey)(_ => ClusterShardingSpec.behaviorWithId()) .withStopMessage(IdStopPlz()) .withMessageExtractor(idTestProtocolMessageExtractor)) - val replyProbe = TestProbe[String]() + val replyProbe = createTestProbe[String]() val id1 = "id1" shardingRef ! IdReplyPlz(id1, replyProbe.ref) replyProbe.expectMessage("Hello!") - val replyToProbe = TestProbe[ClusterShardingStats]() + val replyToProbe = createTestProbe[ClusterShardingStats]() val replyTo = replyToProbe.ref - //#get-cluster-sharding-stats - ClusterSharding(system).shardState ! GetClusterShardingStats(queryTimeout, replyTo) - val stats = replyToProbe.expectMessageType[ClusterShardingStats] - //#get-cluster-sharding-stats + ClusterSharding(system).shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyTo) + val stats = replyToProbe.receiveMessage() val expect = ClusterShardingStats( Map(Cluster(system).selfMember.address -> ShardRegionStats(Map(shardExtractor.shardId(id1) -> 1), Set.empty))) stats shouldEqual expect } + } } diff --git a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala index c6c8e8e52e..7af737140e 100644 --- a/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala @@ -14,6 +14,9 @@ import com.github.ghik.silencer.silent import docs.akka.persistence.typed.BlogPostEntity import docs.akka.persistence.typed.BlogPostEntity.Command +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey + @silent object ShardingCompileOnlySpec { @@ -169,4 +172,42 @@ object ShardingCompileOnlySpec { //#sharded-response } + object ShardRegionStateQuery { + + object Counter { + val TypeKey = EntityTypeKey[Basics.Counter.Command]("Counter") + } + + val replyMessageAdapter: ActorRef[akka.cluster.sharding.ShardRegion.CurrentShardRegionState] = ??? + + //#get-shard-region-state + import akka.cluster.sharding.typed.GetShardRegionState + import akka.cluster.sharding.ShardRegion.CurrentShardRegionState + + val replyTo: ActorRef[CurrentShardRegionState] = replyMessageAdapter + + ClusterSharding(system).shardState ! GetShardRegionState(Counter.TypeKey, replyTo) + //#get-shard-region-state + } + + object ClusterShardingStatsQuery { + + object Counter { + val TypeKey = EntityTypeKey[Basics.Counter.Command]("Counter") + } + + val replyMessageAdapter: ActorRef[akka.cluster.sharding.ShardRegion.ClusterShardingStats] = ??? + + //#get-cluster-sharding-stats + import akka.cluster.sharding.typed.GetClusterShardingStats + import akka.cluster.sharding.ShardRegion.ClusterShardingStats + import scala.concurrent.duration._ + + val replyTo: ActorRef[ClusterShardingStats] = replyMessageAdapter + val timeout: FiniteDuration = 5.seconds + + ClusterSharding(system).shardState ! GetClusterShardingStats(Counter.TypeKey, timeout, replyTo) + //#get-cluster-sharding-stats + } + } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 54cf382c8e..42be1906a1 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -223,6 +223,7 @@ object ShardRegion { /** * Send this message to the `ShardRegion` actor to request for [[CurrentRegions]], * which contains the addresses of all registered regions. + * * Intended for testing purpose to see when cluster sharding is "ready" or to monitor * the state of the shard regions. */ @@ -281,6 +282,7 @@ object ShardRegion { * Send this message to the `ShardRegion` actor to request for [[ShardRegionStats]], * which contains statistics about the currently running sharded entities in the * entire region. + * * Intended for testing purpose to see when cluster sharding is "ready" or to monitor * the state of the shard regions. * diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala index 8d2d1fbfa2..25ccdbd0d5 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala @@ -6,11 +6,12 @@ package akka.cluster.sharding import scala.concurrent.duration._ -import com.typesafe.config.ConfigFactory - import akka.actor._ -import akka.cluster.{ Cluster, MemberStatus } -import akka.testkit.{ TestDuration, TestProbe } +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.testkit.TestDuration +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory object ClusterShardingGetStatsSpec { import MultiNodeClusterShardingSpec.PingPongActor diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 3e1aa3e4d1..655a1e6fb8 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -374,6 +374,36 @@ rebalanced to other nodes. See @ref:[How To Startup when Cluster Size Reached](cluster.md#how-to-startup-when-a-cluster-size-is-reached) for more information about `min-nr-of-members`. +## Inspecting cluster sharding state + +Two requests to inspect the cluster state are available: + +@apidoc[akka.cluster.sharding.typed.GetShardRegionState] which will reply with a +@apidoc[ShardRegion.CurrentShardRegionState] that contains the identifiers of the shards running in +a Region and what entities are alive for each of them. + +Scala +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #get-shard-region-state } + +Java +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #get-shard-region-state } + +@apidoc[akka.cluster.sharding.typed.GetClusterShardingStats] which will query all the regions in the cluster and reply with a +@apidoc[ShardRegion.ClusterShardingStats] containing the identifiers of the shards running in each region and a count +of entities that are alive in each shard. + +Scala +: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #get-cluster-sharding-stats } + +Java +: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #get-cluster-sharding-stats } + +If any shard queries failed, for example due to timeout if a shard was too busy to reply within the configured `akka.cluster.sharding.shard-region-query-timeout`, +`ShardRegion.CurrentShardRegionState` and `ShardRegion.ClusterShardingStats` will also include the set of shard identifiers by region that failed. + +The purpose of these messages is testing and monitoring, they are not provided to give access to +directly sending messages to the individual entities. + ## Lease A @ref[lease](../coordination.md) can be used as an additional safety measure to ensure a shard