diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala index 29a016eaff..576db38c9e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingQuery.scala @@ -5,8 +5,11 @@ package akka.cluster.sharding.typed import akka.actor.typed.ActorRef -import akka.cluster.sharding.ShardRegion.CurrentShardRegionState +import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, CurrentShardRegionState } import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.util.JavaDurationConverters + +import scala.concurrent.duration.FiniteDuration /** * Protocol for querying sharding state e.g. A ShardRegion's state @@ -30,4 +33,24 @@ final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: A this(entityTypeKey.asScala, replyTo) } -// TODO - GetClusterShardingStats +/** + * Query the statistics about the currently running sharded entities in the + * entire cluster. If the given `timeout` is reached without answers from all + * shard regions the reply will contain an empty map of regions. + * + * @param timeout the timeout applied to querying all alive regions + * @param replyTo the actor to send the result to + */ +final case class GetClusterShardingStats(timeout: FiniteDuration, replyTo: ActorRef[ClusterShardingStats]) + extends ClusterShardingQuery { + + /** + * Java API + * + * Query the statistics about the currently running sharded entities in the + * entire cluster. If the given `timeout` is reached without answers from all + * shard regions the reply will contain an empty map of regions. + */ + def this(timeout: java.time.Duration, replyTo: ActorRef[ClusterShardingStats]) = + this(JavaDurationConverters.asFiniteDuration(timeout), replyTo) +} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala index 1f08f0c1c1..eec7b4f7ee 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardingState.scala @@ -3,27 +3,62 @@ */ package akka.cluster.sharding.typed.internal + +import scala.concurrent.Future + import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.annotation.InternalApi -import akka.cluster.sharding.{ ClusterSharding, ShardRegion } import akka.cluster.sharding.ShardRegion.CurrentShardRegionState -import akka.cluster.sharding.typed.{ ClusterShardingQuery, GetShardRegionState } +import akka.cluster.sharding.typed.ClusterShardingQuery +import akka.cluster.sharding.typed.GetClusterShardingStats +import akka.cluster.sharding.typed.GetShardRegionState +import akka.cluster.sharding.ClusterSharding +import akka.cluster.sharding.ShardRegion +import akka.pattern.AskTimeoutException +import akka.util.Timeout /** * INTERNAL API */ @InternalApi private[akka] object ShardingState { - def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = Behaviors.receiveMessage { - case GetShardRegionState(key, replyTo) => - if (classicSharding.getShardTypeNames.contains(key.name)) { - classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic) - } else { - replyTo ! CurrentShardRegionState(Set.empty) + def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = + Behaviors.setup { context => + Behaviors.receiveMessage { + case GetShardRegionState(key, replyTo) => + if (classicSharding.getShardTypeNames.contains(key.name)) { + classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic) + } else { + replyTo ! CurrentShardRegionState(Set.empty) + } + Behaviors.same + + case GetClusterShardingStats(timeout, replyTo) => + import akka.pattern.ask + import akka.pattern.pipe + implicit val t: Timeout = timeout + implicit val ec = context.system.executionContext + + val regions = classicSharding.shardTypeNames.map(classicSharding.shardRegion) + + if (regions.nonEmpty) { + Future + .firstCompletedOf(regions.map { region => + (region ? ShardRegion.GetClusterShardingStats(timeout)).mapTo[ShardRegion.ClusterShardingStats] + }) + .recover { + case _: AskTimeoutException => + ShardRegion.ClusterShardingStats(Map.empty) + } + .pipeTo(replyTo.toClassic) + } else { + replyTo ! ShardRegion.ClusterShardingStats(Map.empty) + } + + Behaviors.same } - Behaviors.same - } + } } diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala new file mode 100644 index 0000000000..7f55998e1d --- /dev/null +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed + +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ ActorRef, Behavior } +import akka.cluster.MultiNodeClusterSpec +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity, EntityTypeKey } +import akka.cluster.typed.MultiNodeTypedClusterSpec +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } +import akka.serialization.jackson.CborSerializable +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +object ClusterShardingStatsSpecConfig extends MultiNodeConfig { + + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(""" + akka.log-dead-letters-during-shutdown = off + akka.cluster.sharding.updating-state-timeout = 2s + akka.cluster.sharding.waiting-for-state-timeout = 2s + """).withFallback(MultiNodeClusterSpec.clusterConfig)) + +} + +class ClusterShardingStatsSpecMultiJvmNode1 extends ClusterShardingStatsSpec +class ClusterShardingStatsSpecMultiJvmNode2 extends ClusterShardingStatsSpec +class ClusterShardingStatsSpecMultiJvmNode3 extends ClusterShardingStatsSpec + +object Pinger { + sealed trait Command extends CborSerializable + case class Ping(id: Int, ref: ActorRef[Pong]) extends Command + case class Pong(id: Int) extends CborSerializable + + def apply(): Behavior[Command] = { + Behaviors.receiveMessage[Command] { + case Ping(id: Int, ref) => + ref ! Pong(id) + Behaviors.same + } + } + +} + +abstract class ClusterShardingStatsSpec + extends MultiNodeSpec(ClusterShardingStatsSpecConfig) + with MultiNodeTypedClusterSpec + with ScalaFutures { + + import ClusterShardingStatsSpecConfig._ + import Pinger._ + + val typeKey = EntityTypeKey[Command]("ping") + + val entityId = "ping-1" + + val sharding = ClusterSharding(typedSystem) + + val settings = ClusterShardingSettings(typedSystem) + + val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all + + "Cluster sharding stats" must { + "form cluster" in { + formCluster(first, second, third) + } + + "get shard stats" in { + + sharding.init(Entity(typeKey)(_ => Pinger())) + enterBarrier("sharding started") + + import akka.actor.typed.scaladsl.adapter._ + val entityRef = ClusterSharding(system.toTyped).entityRefFor(typeKey, entityId) + val pongProbe = TestProbe[Pong] + + entityRef ! Ping(1, pongProbe.ref) + pongProbe.expectMessageType[Pong] + enterBarrier("sharding-initialized") + + runOn(first, second, third) { + val replyToProbe = TestProbe[ClusterShardingStats]() + sharding.shardState ! GetClusterShardingStats(queryTimeout, replyToProbe.ref) + + val stats = replyToProbe.expectMessageType[ClusterShardingStats](queryTimeout) + stats.regions.size shouldEqual 3 + stats.regions.values.flatMap(_.stats.values).sum shouldEqual 1 + } + enterBarrier("done") + + } + + } + +} diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala new file mode 100644 index 0000000000..c971828a23 --- /dev/null +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingStatsSpec.scala @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.scaladsl + +import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit, TestProbe } +import akka.actor.typed.ActorRef +import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, ShardRegionStats } +import akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec._ +import akka.cluster.sharding.typed.{ ClusterShardingSettings, GetClusterShardingStats } +import akka.cluster.typed.{ Cluster, Join, SelfUp } +import org.scalatest.wordspec.AnyWordSpecLike + +class ClusterShardingStatsSpec + extends ScalaTestWithActorTestKit(ClusterShardingSpec.config) + with AnyWordSpecLike + with LogCapturing { + + val sharding = ClusterSharding(system) + + val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes + + val shardExtractor = ClusterShardingSpec.idTestProtocolMessageExtractor + + // no need to scale this up here for the cluster query versus one region + val queryTimeout = ClusterShardingSettings(system).shardRegionQueryTimeout + + "Cluster Sharding" must { + "return empty statistics if there are no running sharded entities" in { + val cluster = Cluster(system) + val upProbe = TestProbe[SelfUp]() + + cluster.subscriptions ! akka.cluster.typed.Subscribe(upProbe.ref, classOf[SelfUp]) + cluster.manager ! Join(cluster.selfMember.address) + upProbe.expectMessageType[SelfUp] + + val emptyProbe = TestProbe[ClusterShardingStats]() + sharding.shardState ! GetClusterShardingStats(queryTimeout, emptyProbe.ref) + emptyProbe.expectMessage(ClusterShardingStats(Map.empty)) + } + + "allow querying of statistics of the currently running sharded entities in the entire cluster" in { + + val shardingRef: ActorRef[IdTestProtocol] = sharding.init( + Entity(typeKey)(_ => ClusterShardingSpec.behaviorWithId()) + .withStopMessage(IdStopPlz()) + .withMessageExtractor(idTestProtocolMessageExtractor)) + + val replyProbe = TestProbe[String]() + val id1 = "id1" + shardingRef ! IdReplyPlz(id1, replyProbe.ref) + replyProbe.expectMessage("Hello!") + + val replyToProbe = TestProbe[ClusterShardingStats]() + val replyTo = replyToProbe.ref + + //#get-cluster-sharding-stats + ClusterSharding(system).shardState ! GetClusterShardingStats(queryTimeout, replyTo) + val stats = replyToProbe.expectMessageType[ClusterShardingStats] + //#get-cluster-sharding-stats + + val expect = ClusterShardingStats( + Map(Cluster(system).selfMember.address -> ShardRegionStats(Map(shardExtractor.shardId(id1) -> 1), Set.empty))) + + stats shouldEqual expect + } + } + +}