From 91398cdcdd02f55fe3a5d454f25f9d2e17f251ca Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Fri, 25 Sep 2020 11:55:29 +0100 Subject: [PATCH] Cluster sharding health check (#29638) * and update akka-docs/src/main/paradox/typed/cluster-sharding.md --- .../src/main/resources/reference.conf | 18 ++++ .../sharding/ClusterShardingHealthCheck.scala | 102 ++++++++++++++++++ .../akka/cluster/sharding/ShardRegion.scala | 21 ++++ .../ClusterShardingHealthCheckSpec.scala | 99 +++++++++++++++++ .../main/paradox/typed/cluster-sharding.md | 22 ++++ 5 files changed, 262 insertions(+) create mode 100644 akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingHealthCheck.scala create mode 100644 akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingHealthCheckSpec.scala diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 617ac625ea..f8bf055241 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -245,9 +245,27 @@ akka.cluster.sharding { # Mostly for the Akka test suite, if off the invalid transition is logged as a warning instead of throwing and # crashing the shard. fail-on-invalid-entity-state-transition = off + + # Healthcheck that can be used with Akka management health checks: https://doc.akka.io/docs/akka-management/current/healthchecks.html + healthcheck { + # sharding names to check have registered with the coordinator for the health check to pass + # once initial registration has taken place the health check always returns true to prevent the coordinator + # moving making the health check of all nodes fail + # by default no sharding instances are monitored + names = [] + + # Timeout for the local shard region to respond. This should be lower than your monitoring system's + # timeout for health checks + timeout = 5s + } } # //#sharding-ext-config +# Enable health check by default for when Akka management is on the classpath +akka.management.health-checks.readiness-checks { + sharding = "akka.cluster.sharding.ClusterShardingHealthCheck" +} + akka.cluster { configuration-compatibility-check { checkers { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingHealthCheck.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingHealthCheck.scala new file mode 100644 index 0000000000..a2e488f308 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingHealthCheck.scala @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.annotation.ApiMayChange +import akka.event.Logging +import akka.pattern.ask +import akka.util.Timeout +import akka.annotation.InternalApi +import akka.pattern.AskTimeoutException +import akka.util.ccompat.JavaConverters._ +import akka.util.JavaDurationConverters._ + +import scala.concurrent.Future +import com.typesafe.config.Config + +import scala.concurrent.duration.FiniteDuration + +/** + * Internal API + */ +@InternalApi +private[akka] object ClusterShardingHealthCheckSettings { + def apply(config: Config): ClusterShardingHealthCheckSettings = + new ClusterShardingHealthCheckSettings( + config.getStringList("names").asScala.toSet, + config.getDuration("timeout").asScala) +} + +@ApiMayChange +final class ClusterShardingHealthCheckSettings(val names: Set[String], val timeout: FiniteDuration) + +private object ClusterShardingHealthCheck { + val Success = Future.successful(true) +} + +/** + * INTERNAL API (ctr) + */ +@ApiMayChange +final class ClusterShardingHealthCheck private[akka] ( + system: ActorSystem, + settings: ClusterShardingHealthCheckSettings, + shardRegion: String => ActorRef) + extends (() => Future[Boolean]) { + + private val log = Logging(system, classOf[ClusterShardingHealthCheck]) + + def this(system: ActorSystem) = + this( + system, + ClusterShardingHealthCheckSettings(system.settings.config.getConfig("akka.cluster.sharding.healthcheck")), + name => ClusterSharding(system).shardRegion(name)) + + private implicit val timeout: Timeout = settings.timeout + private implicit val ec = system.dispatchers.internalDispatcher + + // Once the check has passed it always does + @volatile private var registered = false + + override def apply(): Future[Boolean] = { + if (settings.names.isEmpty || registered) { + ClusterShardingHealthCheck.Success + } else { + Future + .traverse(settings.names) { name => + shardRegion(name) // this can throw if shard region not registered and it'll fail the check + .ask(ShardRegion.GetShardRegionStatus) + .mapTo[ShardRegion.ShardRegionStatus] + } + .map { allResponses => + val allRegistered = allResponses.forall(_.registeredWithCoordinator) + if (!allRegistered && log.isInfoEnabled) { + log.info( + "Not all shard regions have registered with coordinator. Still to register: [{}]", + allResponses + .collect { + case response if !response.registeredWithCoordinator => response.typeName + } + .mkString(",")) + } + if (allRegistered) { + registered = true + } + allRegistered + } + .recover { + case _: AskTimeoutException => + if (log.isDebugEnabled) { + log.debug( + "Shard regions [{}] did not respond in time. Failing health check.", + settings.names.mkString(",")) + } + false + } + } + } +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 33b65941e5..b2bc5a7f5e 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -423,6 +423,24 @@ object ShardRegion { } } + /** + * INTERNAL API + * + * Discover if the shard region is registered with the coordinator. + * Not serializable as only to be sent to the local shard region + * Response is [[ShardRegionState]] + */ + @InternalApi + private[akka] final object GetShardRegionStatus extends ShardRegionQuery + + /** + * INTERNAL API + * + * Status of a ShardRegion. Only for local requests so not serializable. + */ + @InternalApi + private[akka] final class ShardRegionStatus(val typeName: String, val registeredWithCoordinator: Boolean) + private case object Retry extends ShardRegionCommand private case object RegisterRetry extends ShardRegionCommand @@ -822,6 +840,9 @@ private[akka] class ShardRegion( case GetShardRegionStats => replyToRegionStatsQuery(sender()) + case GetShardRegionStatus => + sender() ! new ShardRegionStatus(typeName, coordinator.isDefined) + case _ => unhandled(query) } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingHealthCheckSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingHealthCheckSpec.scala new file mode 100644 index 0000000000..176c754561 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingHealthCheckSpec.scala @@ -0,0 +1,99 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.testkit.AkkaSpec +import akka.testkit.TestProbe +import akka.testkit.WithLogCapturing +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ + +object ClusterShardingHealthCheckSpec { + val config = ConfigFactory.parseString(""" + akka.loglevel = DEBUG + akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + """.stripMargin) +} + +class ClusterShardingHealthCheckSpec + extends AkkaSpec(ClusterShardingHealthCheckSpec.config) + with WithLogCapturing + with ScalaFutures { + + "Sharding health check" should { + "pass if no checks configured" in { + val shardRegionProbe = TestProbe() + val check = new ClusterShardingHealthCheck( + system, + new ClusterShardingHealthCheckSettings(Set.empty, 1.second), + _ => shardRegionProbe.ref) + check().futureValue shouldEqual true + } + "pass if all region return true" in { + val shardRegionProbe = TestProbe() + val check = new ClusterShardingHealthCheck( + system, + new ClusterShardingHealthCheckSettings(Set("cat"), 1.second), + _ => shardRegionProbe.ref) + val response = check() + shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus) + shardRegionProbe.reply(new ShardRegion.ShardRegionStatus("cat", true)) + response.futureValue shouldEqual true + } + "fail if all region returns false" in { + val shardRegionProbe = TestProbe() + val check = new ClusterShardingHealthCheck( + system, + new ClusterShardingHealthCheckSettings(Set("cat"), 1.second), + _ => shardRegionProbe.ref) + val response = check() + shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus) + shardRegionProbe.reply(new ShardRegion.ShardRegionStatus("cat", false)) + response.futureValue shouldEqual false + } + "fail if a subset region returns false" in { + val shardRegionProbe = TestProbe() + val check = new ClusterShardingHealthCheck( + system, + new ClusterShardingHealthCheckSettings(Set("cat", "dog"), 1.second), + _ => shardRegionProbe.ref) + val response = check() + shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus) + shardRegionProbe.reply(new ShardRegion.ShardRegionStatus("cat", true)) + shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus) + shardRegionProbe.reply(new ShardRegion.ShardRegionStatus("dog", false)) + response.futureValue shouldEqual false + } + "times out" in { + val shardRegionProbe = TestProbe() + val check = new ClusterShardingHealthCheck( + system, + new ClusterShardingHealthCheckSettings(Set("cat"), 100.millis), + _ => shardRegionProbe.ref) + val response = check() + shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus) + // don't reply + response.futureValue shouldEqual false + } + "always pass after all regions have reported registered" in { + val shardRegionProbe = TestProbe() + val check = new ClusterShardingHealthCheck( + system, + new ClusterShardingHealthCheckSettings(Set("cat"), 1.second), + _ => shardRegionProbe.ref) + val response = check() + shardRegionProbe.expectMsg(ShardRegion.GetShardRegionStatus) + shardRegionProbe.reply(new ShardRegion.ShardRegionStatus("cat", true)) + response.futureValue shouldEqual true + + val secondResponse = check() + shardRegionProbe.expectNoMessage() + secondResponse.futureValue shouldEqual true + } + } + +} diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 3161c5254c..3d7ddde50f 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -464,6 +464,28 @@ rebalanced to other nodes. See @ref:[How To Startup when Cluster Size Reached](cluster.md#how-to-startup-when-a-cluster-size-is-reached) for more information about `min-nr-of-members`. +## Health check + +An [Akka Management compatible health check](https://doc.akka.io/docs/akka-management/current/healthchecks.html) is included that returns healthy once the local shard region +has registered with the coordinator. This health check should be used in cases where you don't want to receive production traffic until the local shard region is ready to retrieve locations +for shards. For shard regions that aren't critical and therefore should not block this node becoming ready do not include them. + +The health check does not fail after an initial successful check. Once a shard region is registered and is operational it stays available for incoming message. + +Cluster sharding enables the health check automatically. To disable: + +```ruby +akka.management.health-checks.readiness-checks { + sharding = "" +} +``` + +Monitoring of each shard region is off by default. Add them by defining the entity type names (`EntityTypeKey.name`): + +```ruby +akka.cluster.sharding.healthcheck.names = ["counter-1", "HelloWorld"] +``` + ## Inspecting cluster sharding state Two requests to inspect the cluster state are available: