Merge pull request #28992 from akka/wip-24466-sharding-query-patriknw

Support GetClusterShardingStats query in Typed #24466
This commit is contained in:
Patrik Nordwall 2020-04-30 12:26:39 +02:00 committed by GitHub
commit bef03f8585
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 387 additions and 18 deletions

View file

@ -4,9 +4,13 @@
package akka.cluster.sharding.typed
import scala.concurrent.duration.FiniteDuration
import akka.actor.typed.ActorRef
import akka.cluster.sharding.ShardRegion.ClusterShardingStats
import akka.cluster.sharding.ShardRegion.CurrentShardRegionState
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.util.JavaDurationConverters
/**
* Protocol for querying sharding state e.g. A ShardRegion's state
@ -16,6 +20,11 @@ sealed trait ClusterShardingQuery
/**
* Query the ShardRegion state for the given entity type key. This will get the state of the
* local ShardRegion's state.
*
* Intended for testing purpose to see when cluster sharding is "ready" or to monitor
* the state of the shard regions.
*
* For the statistics for the entire cluster, see [[GetClusterShardingStats]].
*/
final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: ActorRef[CurrentShardRegionState])
extends ClusterShardingQuery {
@ -30,4 +39,33 @@ final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: A
this(entityTypeKey.asScala, replyTo)
}
// TODO - GetClusterShardingStats
/**
* Query the statistics about the currently running sharded entities in the
* entire cluster. If the given `timeout` is reached without answers from all
* shard regions the reply will contain an empty map of regions.
*
* Intended for testing purpose to see when cluster sharding is "ready" or to monitor
* the state of the shard regions.
*
* @param timeout the timeout applied to querying all alive regions
* @param replyTo the actor to send the result to
*/
final case class GetClusterShardingStats(
entityTypeKey: EntityTypeKey[_],
timeout: FiniteDuration,
replyTo: ActorRef[ClusterShardingStats])
extends ClusterShardingQuery {
/**
* Java API
*
* Query the statistics about the currently running sharded entities in the
* entire cluster. If the given `timeout` is reached without answers from all
* shard regions the reply will contain an empty map of regions.
*/
def this(
entityTypeKey: javadsl.EntityTypeKey[_],
timeout: java.time.Duration,
replyTo: ActorRef[ClusterShardingStats]) =
this(entityTypeKey.asScala, JavaDurationConverters.asFiniteDuration(timeout), replyTo)
}

View file

@ -3,27 +3,64 @@
*/
package akka.cluster.sharding.typed.internal
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
import akka.cluster.sharding.{ ClusterSharding, ShardRegion }
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.CurrentShardRegionState
import akka.cluster.sharding.typed.{ ClusterShardingQuery, GetShardRegionState }
import akka.cluster.sharding.typed.ClusterShardingQuery
import akka.cluster.sharding.typed.GetClusterShardingStats
import akka.cluster.sharding.typed.GetShardRegionState
/**
* INTERNAL API
*/
@InternalApi private[akka] object ShardingState {
def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = Behaviors.receiveMessage {
case GetShardRegionState(key, replyTo) =>
if (classicSharding.getShardTypeNames.contains(key.name)) {
classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic)
} else {
replyTo ! CurrentShardRegionState(Set.empty)
def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = {
Behaviors
.supervise[ClusterShardingQuery] {
Behaviors.setup { context =>
Behaviors.receiveMessage {
case GetShardRegionState(key, replyTo) =>
if (classicSharding.getShardTypeNames.contains(key.name)) {
try {
classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic)
} catch {
case e: IllegalStateException =>
// classicSharding.shardRegion may throw if not initialized
context.log.warn(e.getMessage)
replyTo ! CurrentShardRegionState(Set.empty)
}
} else {
replyTo ! CurrentShardRegionState(Set.empty)
}
Behaviors.same
case GetClusterShardingStats(key, timeout, replyTo) =>
if (classicSharding.getShardTypeNames.contains(key.name)) {
try {
classicSharding
.shardRegion(key.name)
.tell(ShardRegion.GetClusterShardingStats(timeout), replyTo.toClassic)
} catch {
case e: IllegalStateException =>
// classicSharding.shardRegion may throw if not initialized
context.log.warn(e.getMessage)
replyTo ! ShardRegion.ClusterShardingStats(Map.empty)
}
} else {
replyTo ! ShardRegion.ClusterShardingStats(Map.empty)
}
Behaviors.same
}
}
}
Behaviors.same
.onFailure(SupervisorStrategy.restart)
}
}

View file

@ -0,0 +1,108 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.sharding.ShardRegion.ClusterShardingStats
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.typed.MultiNodeTypedClusterSpec
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.serialization.jackson.CborSerializable
object ClusterShardingStatsSpecConfig extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.log-dead-letters-during-shutdown = off
akka.cluster.sharding.updating-state-timeout = 2s
akka.cluster.sharding.waiting-for-state-timeout = 2s
""").withFallback(MultiNodeClusterSpec.clusterConfig))
}
class ClusterShardingStatsSpecMultiJvmNode1 extends ClusterShardingStatsSpec
class ClusterShardingStatsSpecMultiJvmNode2 extends ClusterShardingStatsSpec
class ClusterShardingStatsSpecMultiJvmNode3 extends ClusterShardingStatsSpec
object Pinger {
sealed trait Command extends CborSerializable
case class Ping(id: Int, ref: ActorRef[Pong]) extends Command
case class Pong(id: Int) extends CborSerializable
def apply(): Behavior[Command] = {
Behaviors.receiveMessage[Command] {
case Ping(id: Int, ref) =>
ref ! Pong(id)
Behaviors.same
}
}
}
abstract class ClusterShardingStatsSpec
extends MultiNodeSpec(ClusterShardingStatsSpecConfig)
with MultiNodeTypedClusterSpec
with ScalaFutures {
import ClusterShardingStatsSpecConfig._
import Pinger._
private val typeKey = EntityTypeKey[Command]("ping")
private val sharding = ClusterSharding(typedSystem)
private val settings = ClusterShardingSettings(typedSystem)
private val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all
"Cluster sharding stats" must {
"form cluster" in {
formCluster(first, second, third)
}
"get shard stats" in {
sharding.init(Entity(typeKey)(_ => Pinger()))
enterBarrier("sharding started")
runOn(first) {
val pongProbe = TestProbe[Pong]()
val entityRef1 = ClusterSharding(typedSystem).entityRefFor(typeKey, "ping-1")
entityRef1 ! Ping(1, pongProbe.ref)
pongProbe.receiveMessage()
val entityRef2 = ClusterSharding(typedSystem).entityRefFor(typeKey, "ping-2")
entityRef2 ! Ping(2, pongProbe.ref)
pongProbe.receiveMessage()
}
enterBarrier("sharding-initialized")
runOn(first, second, third) {
val replyToProbe = TestProbe[ClusterShardingStats]()
sharding.shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyToProbe.ref)
val stats = replyToProbe.receiveMessage(queryTimeout)
stats.regions.size shouldEqual 3
stats.regions.values.flatMap(_.stats.values).sum shouldEqual 2
stats.regions.values.forall(_.failed.isEmpty) shouldBe true
}
enterBarrier("done")
}
}
}

View file

@ -24,6 +24,17 @@ import akka.persistence.typed.PersistenceId;
// #import
// #get-shard-region-state
import akka.cluster.sharding.typed.GetShardRegionState;
import akka.cluster.sharding.ShardRegion.CurrentShardRegionState;
// #get-shard-region-state
// #get-cluster-sharding-stats
import akka.cluster.sharding.typed.GetClusterShardingStats;
import akka.cluster.sharding.ShardRegion.ClusterShardingStats;
// #get-cluster-sharding-stats
import jdocs.akka.persistence.typed.BlogPostEntity;
interface ShardingCompileOnlyTest {
@ -248,4 +259,31 @@ interface ShardingCompileOnlyTest {
ClusterSharding.get(system).entityRefFor(typeKey, entityId, "dc2");
// #proxy-dc-entityref
}
public static void shardRegionQqueryExample() {
ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample");
ActorRef<CurrentShardRegionState> replyMessageAdapter = null;
EntityTypeKey<Counter.Command> typeKey = EntityTypeKey.create(Counter.Command.class, "Counter");
// #get-shard-region-state
ActorRef<CurrentShardRegionState> replyTo = replyMessageAdapter;
ClusterSharding.get(system).shardState().tell(new GetShardRegionState(typeKey, replyTo));
// #get-shard-region-state
}
public static void shardingStatsQqueryExample() {
ActorSystem system = ActorSystem.create(Behaviors.empty(), "ShardingExample");
ActorRef<ClusterShardingStats> replyMessageAdapter = null;
EntityTypeKey<Counter.Command> typeKey = EntityTypeKey.create(Counter.Command.class, "Counter");
// #get-cluster-sharding-stats
ActorRef<ClusterShardingStats> replyTo = replyMessageAdapter;
Duration timeout = Duration.ofSeconds(5);
ClusterSharding.get(system)
.shardState()
.tell(new GetClusterShardingStats(typeKey, timeout, replyTo));
// #get-cluster-sharding-stats
}
}

View file

@ -19,19 +19,19 @@ class ClusterShardingStateSpec
with AnyWordSpecLike
with LogCapturing {
val sharding = ClusterSharding(system)
private val sharding = ClusterSharding(system)
val shardExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
private val shardExtractor = ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) => id
case IdWhoAreYou(id, _) => id
case other => throw new IllegalArgumentException(s"Unexpected message $other")
}
val cluster = Cluster(system)
private val cluster = Cluster(system)
val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes
"Cluster Sharding" must {
"Cluster Sharding CurrentShardRegionState query" must {
"allow querying of the shard region state" in {
val probe = TestProbe[CurrentShardRegionState]()
cluster.manager ! Join(cluster.selfMember.address)

View file

@ -0,0 +1,74 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.scaladsl
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.cluster.sharding.ShardRegion.ClusterShardingStats
import akka.cluster.sharding.ShardRegion.ShardRegionStats
import akka.cluster.sharding.typed.ClusterShardingSettings
import akka.cluster.sharding.typed.GetClusterShardingStats
import akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec._
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.cluster.typed.SelfUp
class ClusterShardingStatsSpec
extends ScalaTestWithActorTestKit(ClusterShardingSpec.config)
with AnyWordSpecLike
with LogCapturing {
private val sharding = ClusterSharding(system)
private val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes
private val shardExtractor = ClusterShardingSpec.idTestProtocolMessageExtractor
// no need to scale this up here for the cluster query versus one region
private val queryTimeout = ClusterShardingSettings(system).shardRegionQueryTimeout
"Cluster Sharding ClusterShardingStats query" must {
"return empty statistics if there are no running sharded entities" in {
val cluster = Cluster(system)
val upProbe = createTestProbe[SelfUp]()
cluster.subscriptions ! akka.cluster.typed.Subscribe(upProbe.ref, classOf[SelfUp])
cluster.manager ! Join(cluster.selfMember.address)
upProbe.expectMessageType[SelfUp]
val replyProbe = createTestProbe[ClusterShardingStats]()
sharding.shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyProbe.ref)
replyProbe.expectMessage(ClusterShardingStats(Map.empty))
}
"allow querying of statistics of the currently running sharded entities in the entire cluster" in {
val shardingRef: ActorRef[IdTestProtocol] = sharding.init(
Entity(typeKey)(_ => ClusterShardingSpec.behaviorWithId())
.withStopMessage(IdStopPlz())
.withMessageExtractor(idTestProtocolMessageExtractor))
val replyProbe = createTestProbe[String]()
val id1 = "id1"
shardingRef ! IdReplyPlz(id1, replyProbe.ref)
replyProbe.expectMessage("Hello!")
val replyToProbe = createTestProbe[ClusterShardingStats]()
val replyTo = replyToProbe.ref
ClusterSharding(system).shardState ! GetClusterShardingStats(typeKey, queryTimeout, replyTo)
val stats = replyToProbe.receiveMessage()
val expect = ClusterShardingStats(
Map(Cluster(system).selfMember.address -> ShardRegionStats(Map(shardExtractor.shardId(id1) -> 1), Set.empty)))
stats shouldEqual expect
}
}
}

View file

@ -14,6 +14,9 @@ import com.github.ghik.silencer.silent
import docs.akka.persistence.typed.BlogPostEntity
import docs.akka.persistence.typed.BlogPostEntity.Command
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
@silent
object ShardingCompileOnlySpec {
@ -169,4 +172,42 @@ object ShardingCompileOnlySpec {
//#sharded-response
}
object ShardRegionStateQuery {
object Counter {
val TypeKey = EntityTypeKey[Basics.Counter.Command]("Counter")
}
val replyMessageAdapter: ActorRef[akka.cluster.sharding.ShardRegion.CurrentShardRegionState] = ???
//#get-shard-region-state
import akka.cluster.sharding.typed.GetShardRegionState
import akka.cluster.sharding.ShardRegion.CurrentShardRegionState
val replyTo: ActorRef[CurrentShardRegionState] = replyMessageAdapter
ClusterSharding(system).shardState ! GetShardRegionState(Counter.TypeKey, replyTo)
//#get-shard-region-state
}
object ClusterShardingStatsQuery {
object Counter {
val TypeKey = EntityTypeKey[Basics.Counter.Command]("Counter")
}
val replyMessageAdapter: ActorRef[akka.cluster.sharding.ShardRegion.ClusterShardingStats] = ???
//#get-cluster-sharding-stats
import akka.cluster.sharding.typed.GetClusterShardingStats
import akka.cluster.sharding.ShardRegion.ClusterShardingStats
import scala.concurrent.duration._
val replyTo: ActorRef[ClusterShardingStats] = replyMessageAdapter
val timeout: FiniteDuration = 5.seconds
ClusterSharding(system).shardState ! GetClusterShardingStats(Counter.TypeKey, timeout, replyTo)
//#get-cluster-sharding-stats
}
}

View file

@ -223,6 +223,7 @@ object ShardRegion {
/**
* Send this message to the `ShardRegion` actor to request for [[CurrentRegions]],
* which contains the addresses of all registered regions.
*
* Intended for testing purpose to see when cluster sharding is "ready" or to monitor
* the state of the shard regions.
*/
@ -281,6 +282,7 @@ object ShardRegion {
* Send this message to the `ShardRegion` actor to request for [[ShardRegionStats]],
* which contains statistics about the currently running sharded entities in the
* entire region.
*
* Intended for testing purpose to see when cluster sharding is "ready" or to monitor
* the state of the shard regions.
*

View file

@ -6,11 +6,12 @@ package akka.cluster.sharding
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.cluster.{ Cluster, MemberStatus }
import akka.testkit.{ TestDuration, TestProbe }
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.testkit.TestDuration
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
object ClusterShardingGetStatsSpec {
import MultiNodeClusterShardingSpec.PingPongActor

View file

@ -374,6 +374,36 @@ rebalanced to other nodes.
See @ref:[How To Startup when Cluster Size Reached](cluster.md#how-to-startup-when-a-cluster-size-is-reached)
for more information about `min-nr-of-members`.
## Inspecting cluster sharding state
Two requests to inspect the cluster state are available:
@apidoc[akka.cluster.sharding.typed.GetShardRegionState] which will reply with a
@apidoc[ShardRegion.CurrentShardRegionState] that contains the identifiers of the shards running in
a Region and what entities are alive for each of them.
Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #get-shard-region-state }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #get-shard-region-state }
@apidoc[akka.cluster.sharding.typed.GetClusterShardingStats] which will query all the regions in the cluster and reply with a
@apidoc[ShardRegion.ClusterShardingStats] containing the identifiers of the shards running in each region and a count
of entities that are alive in each shard.
Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #get-cluster-sharding-stats }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #get-cluster-sharding-stats }
If any shard queries failed, for example due to timeout if a shard was too busy to reply within the configured `akka.cluster.sharding.shard-region-query-timeout`,
`ShardRegion.CurrentShardRegionState` and `ShardRegion.ClusterShardingStats` will also include the set of shard identifiers by region that failed.
The purpose of these messages is testing and monitoring, they are not provided to give access to
directly sending messages to the individual entities.
## Lease
A @ref[lease](../coordination.md) can be used as an additional safety measure to ensure a shard