Support GetClusterShardingStats query in Typed #24466

This commit is contained in:
Helena Edelson 2020-02-28 15:10:47 -08:00 committed by Patrik Nordwall
parent c2945a3e7f
commit c0e31978e6
4 changed files with 242 additions and 12 deletions

View file

@ -5,8 +5,11 @@
package akka.cluster.sharding.typed
import akka.actor.typed.ActorRef
import akka.cluster.sharding.ShardRegion.CurrentShardRegionState
import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, CurrentShardRegionState }
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.util.JavaDurationConverters
import scala.concurrent.duration.FiniteDuration
/**
* Protocol for querying sharding state e.g. A ShardRegion's state
@ -30,4 +33,24 @@ final case class GetShardRegionState(entityTypeKey: EntityTypeKey[_], replyTo: A
this(entityTypeKey.asScala, replyTo)
}
// TODO - GetClusterShardingStats
/**
* Query the statistics about the currently running sharded entities in the
* entire cluster. If the given `timeout` is reached without answers from all
* shard regions the reply will contain an empty map of regions.
*
* @param timeout the timeout applied to querying all alive regions
* @param replyTo the actor to send the result to
*/
final case class GetClusterShardingStats(timeout: FiniteDuration, replyTo: ActorRef[ClusterShardingStats])
extends ClusterShardingQuery {
/**
* Java API
*
* Query the statistics about the currently running sharded entities in the
* entire cluster. If the given `timeout` is reached without answers from all
* shard regions the reply will contain an empty map of regions.
*/
def this(timeout: java.time.Duration, replyTo: ActorRef[ClusterShardingStats]) =
this(JavaDurationConverters.asFiniteDuration(timeout), replyTo)
}

View file

@ -3,27 +3,62 @@
*/
package akka.cluster.sharding.typed.internal
import scala.concurrent.Future
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
import akka.cluster.sharding.{ ClusterSharding, ShardRegion }
import akka.cluster.sharding.ShardRegion.CurrentShardRegionState
import akka.cluster.sharding.typed.{ ClusterShardingQuery, GetShardRegionState }
import akka.cluster.sharding.typed.ClusterShardingQuery
import akka.cluster.sharding.typed.GetClusterShardingStats
import akka.cluster.sharding.typed.GetShardRegionState
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ShardRegion
import akka.pattern.AskTimeoutException
import akka.util.Timeout
/**
* INTERNAL API
*/
@InternalApi private[akka] object ShardingState {
def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] = Behaviors.receiveMessage {
case GetShardRegionState(key, replyTo) =>
if (classicSharding.getShardTypeNames.contains(key.name)) {
classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic)
} else {
replyTo ! CurrentShardRegionState(Set.empty)
def behavior(classicSharding: ClusterSharding): Behavior[ClusterShardingQuery] =
Behaviors.setup { context =>
Behaviors.receiveMessage {
case GetShardRegionState(key, replyTo) =>
if (classicSharding.getShardTypeNames.contains(key.name)) {
classicSharding.shardRegion(key.name).tell(ShardRegion.GetShardRegionState, replyTo.toClassic)
} else {
replyTo ! CurrentShardRegionState(Set.empty)
}
Behaviors.same
case GetClusterShardingStats(timeout, replyTo) =>
import akka.pattern.ask
import akka.pattern.pipe
implicit val t: Timeout = timeout
implicit val ec = context.system.executionContext
val regions = classicSharding.shardTypeNames.map(classicSharding.shardRegion)
if (regions.nonEmpty) {
Future
.firstCompletedOf(regions.map { region =>
(region ? ShardRegion.GetClusterShardingStats(timeout)).mapTo[ShardRegion.ClusterShardingStats]
})
.recover {
case _: AskTimeoutException =>
ShardRegion.ClusterShardingStats(Map.empty)
}
.pipeTo(replyTo.toClassic)
} else {
replyTo ! ShardRegion.ClusterShardingStats(Map.empty)
}
Behaviors.same
}
Behaviors.same
}
}
}

View file

@ -0,0 +1,102 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, Behavior }
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.sharding.ShardRegion.ClusterShardingStats
import akka.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity, EntityTypeKey }
import akka.cluster.typed.MultiNodeTypedClusterSpec
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.serialization.jackson.CborSerializable
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
object ClusterShardingStatsSpecConfig extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.log-dead-letters-during-shutdown = off
akka.cluster.sharding.updating-state-timeout = 2s
akka.cluster.sharding.waiting-for-state-timeout = 2s
""").withFallback(MultiNodeClusterSpec.clusterConfig))
}
class ClusterShardingStatsSpecMultiJvmNode1 extends ClusterShardingStatsSpec
class ClusterShardingStatsSpecMultiJvmNode2 extends ClusterShardingStatsSpec
class ClusterShardingStatsSpecMultiJvmNode3 extends ClusterShardingStatsSpec
object Pinger {
sealed trait Command extends CborSerializable
case class Ping(id: Int, ref: ActorRef[Pong]) extends Command
case class Pong(id: Int) extends CborSerializable
def apply(): Behavior[Command] = {
Behaviors.receiveMessage[Command] {
case Ping(id: Int, ref) =>
ref ! Pong(id)
Behaviors.same
}
}
}
abstract class ClusterShardingStatsSpec
extends MultiNodeSpec(ClusterShardingStatsSpecConfig)
with MultiNodeTypedClusterSpec
with ScalaFutures {
import ClusterShardingStatsSpecConfig._
import Pinger._
val typeKey = EntityTypeKey[Command]("ping")
val entityId = "ping-1"
val sharding = ClusterSharding(typedSystem)
val settings = ClusterShardingSettings(typedSystem)
val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all
"Cluster sharding stats" must {
"form cluster" in {
formCluster(first, second, third)
}
"get shard stats" in {
sharding.init(Entity(typeKey)(_ => Pinger()))
enterBarrier("sharding started")
import akka.actor.typed.scaladsl.adapter._
val entityRef = ClusterSharding(system.toTyped).entityRefFor(typeKey, entityId)
val pongProbe = TestProbe[Pong]
entityRef ! Ping(1, pongProbe.ref)
pongProbe.expectMessageType[Pong]
enterBarrier("sharding-initialized")
runOn(first, second, third) {
val replyToProbe = TestProbe[ClusterShardingStats]()
sharding.shardState ! GetClusterShardingStats(queryTimeout, replyToProbe.ref)
val stats = replyToProbe.expectMessageType[ClusterShardingStats](queryTimeout)
stats.regions.size shouldEqual 3
stats.regions.values.flatMap(_.stats.values).sum shouldEqual 1
}
enterBarrier("done")
}
}
}

View file

@ -0,0 +1,70 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed.scaladsl
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed.ActorRef
import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, ShardRegionStats }
import akka.cluster.sharding.typed.scaladsl.ClusterShardingSpec._
import akka.cluster.sharding.typed.{ ClusterShardingSettings, GetClusterShardingStats }
import akka.cluster.typed.{ Cluster, Join, SelfUp }
import org.scalatest.wordspec.AnyWordSpecLike
class ClusterShardingStatsSpec
extends ScalaTestWithActorTestKit(ClusterShardingSpec.config)
with AnyWordSpecLike
with LogCapturing {
val sharding = ClusterSharding(system)
val typeKey: EntityTypeKey[IdTestProtocol] = ClusterShardingSpec.typeKeyWithoutEnvelopes
val shardExtractor = ClusterShardingSpec.idTestProtocolMessageExtractor
// no need to scale this up here for the cluster query versus one region
val queryTimeout = ClusterShardingSettings(system).shardRegionQueryTimeout
"Cluster Sharding" must {
"return empty statistics if there are no running sharded entities" in {
val cluster = Cluster(system)
val upProbe = TestProbe[SelfUp]()
cluster.subscriptions ! akka.cluster.typed.Subscribe(upProbe.ref, classOf[SelfUp])
cluster.manager ! Join(cluster.selfMember.address)
upProbe.expectMessageType[SelfUp]
val emptyProbe = TestProbe[ClusterShardingStats]()
sharding.shardState ! GetClusterShardingStats(queryTimeout, emptyProbe.ref)
emptyProbe.expectMessage(ClusterShardingStats(Map.empty))
}
"allow querying of statistics of the currently running sharded entities in the entire cluster" in {
val shardingRef: ActorRef[IdTestProtocol] = sharding.init(
Entity(typeKey)(_ => ClusterShardingSpec.behaviorWithId())
.withStopMessage(IdStopPlz())
.withMessageExtractor(idTestProtocolMessageExtractor))
val replyProbe = TestProbe[String]()
val id1 = "id1"
shardingRef ! IdReplyPlz(id1, replyProbe.ref)
replyProbe.expectMessage("Hello!")
val replyToProbe = TestProbe[ClusterShardingStats]()
val replyTo = replyToProbe.ref
//#get-cluster-sharding-stats
ClusterSharding(system).shardState ! GetClusterShardingStats(queryTimeout, replyTo)
val stats = replyToProbe.expectMessageType[ClusterShardingStats]
//#get-cluster-sharding-stats
val expect = ClusterShardingStats(
Map(Cluster(system).selfMember.address -> ShardRegionStats(Map(shardExtractor.shardId(id1) -> 1), Set.empty)))
stats shouldEqual expect
}
}
}