Productionize: GetShardRegionStats returns empty shard set on ask timeout (#27395)

This commit is contained in:
Helena Edelson 2019-07-25 08:00:34 -07:00 committed by GitHub
parent 051ff07ca2
commit 3534a0b977
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 632 additions and 70 deletions

View file

@ -6190,6 +6190,26 @@ public final class ClusterShardingMessages {
*/
akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.MapFieldEntryOrBuilder getStatsOrBuilder(
int index);
// repeated string failed = 2;
/**
* <code>repeated string failed = 2;</code>
*/
java.util.List<java.lang.String>
getFailedList();
/**
* <code>repeated string failed = 2;</code>
*/
int getFailedCount();
/**
* <code>repeated string failed = 2;</code>
*/
java.lang.String getFailed(int index);
/**
* <code>repeated string failed = 2;</code>
*/
akka.protobuf.ByteString
getFailedBytes(int index);
}
/**
* Protobuf type {@code ShardRegionStats}
@ -6250,6 +6270,14 @@ public final class ClusterShardingMessages {
stats_.add(input.readMessage(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.MapFieldEntry.PARSER, extensionRegistry));
break;
}
case 18: {
if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
failed_ = new akka.protobuf.LazyStringArrayList();
mutable_bitField0_ |= 0x00000002;
}
failed_.add(input.readBytes());
break;
}
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@ -6261,6 +6289,9 @@ public final class ClusterShardingMessages {
if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
stats_ = java.util.Collections.unmodifiableList(stats_);
}
if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
failed_ = new akka.protobuf.UnmodifiableLazyStringList(failed_);
}
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@ -6328,8 +6359,39 @@ public final class ClusterShardingMessages {
return stats_.get(index);
}
// repeated string failed = 2;
public static final int FAILED_FIELD_NUMBER = 2;
private akka.protobuf.LazyStringList failed_;
/**
* <code>repeated string failed = 2;</code>
*/
public java.util.List<java.lang.String>
getFailedList() {
return failed_;
}
/**
* <code>repeated string failed = 2;</code>
*/
public int getFailedCount() {
return failed_.size();
}
/**
* <code>repeated string failed = 2;</code>
*/
public java.lang.String getFailed(int index) {
return failed_.get(index);
}
/**
* <code>repeated string failed = 2;</code>
*/
public akka.protobuf.ByteString
getFailedBytes(int index) {
return failed_.getByteString(index);
}
private void initFields() {
stats_ = java.util.Collections.emptyList();
failed_ = akka.protobuf.LazyStringArrayList.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@ -6346,6 +6408,9 @@ public final class ClusterShardingMessages {
for (int i = 0; i < stats_.size(); i++) {
output.writeMessage(1, stats_.get(i));
}
for (int i = 0; i < failed_.size(); i++) {
output.writeBytes(2, failed_.getByteString(i));
}
getUnknownFields().writeTo(output);
}
@ -6359,6 +6424,15 @@ public final class ClusterShardingMessages {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(1, stats_.get(i));
}
{
int dataSize = 0;
for (int i = 0; i < failed_.size(); i++) {
dataSize += akka.protobuf.CodedOutputStream
.computeBytesSizeNoTag(failed_.getByteString(i));
}
size += dataSize;
size += 1 * getFailedList().size();
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@ -6482,6 +6556,8 @@ public final class ClusterShardingMessages {
} else {
statsBuilder_.clear();
}
failed_ = akka.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
@ -6518,6 +6594,12 @@ public final class ClusterShardingMessages {
} else {
result.stats_ = statsBuilder_.build();
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
failed_ = new akka.protobuf.UnmodifiableLazyStringList(
failed_);
bitField0_ = (bitField0_ & ~0x00000002);
}
result.failed_ = failed_;
onBuilt();
return result;
}
@ -6559,6 +6641,16 @@ public final class ClusterShardingMessages {
}
}
}
if (!other.failed_.isEmpty()) {
if (failed_.isEmpty()) {
failed_ = other.failed_;
bitField0_ = (bitField0_ & ~0x00000002);
} else {
ensureFailedIsMutable();
failed_.addAll(other.failed_);
}
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@ -6826,6 +6918,99 @@ public final class ClusterShardingMessages {
return statsBuilder_;
}
// repeated string failed = 2;
private akka.protobuf.LazyStringList failed_ = akka.protobuf.LazyStringArrayList.EMPTY;
private void ensureFailedIsMutable() {
if (!((bitField0_ & 0x00000002) == 0x00000002)) {
failed_ = new akka.protobuf.LazyStringArrayList(failed_);
bitField0_ |= 0x00000002;
}
}
/**
* <code>repeated string failed = 2;</code>
*/
public java.util.List<java.lang.String>
getFailedList() {
return java.util.Collections.unmodifiableList(failed_);
}
/**
* <code>repeated string failed = 2;</code>
*/
public int getFailedCount() {
return failed_.size();
}
/**
* <code>repeated string failed = 2;</code>
*/
public java.lang.String getFailed(int index) {
return failed_.get(index);
}
/**
* <code>repeated string failed = 2;</code>
*/
public akka.protobuf.ByteString
getFailedBytes(int index) {
return failed_.getByteString(index);
}
/**
* <code>repeated string failed = 2;</code>
*/
public Builder setFailed(
int index, java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureFailedIsMutable();
failed_.set(index, value);
onChanged();
return this;
}
/**
* <code>repeated string failed = 2;</code>
*/
public Builder addFailed(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
ensureFailedIsMutable();
failed_.add(value);
onChanged();
return this;
}
/**
* <code>repeated string failed = 2;</code>
*/
public Builder addAllFailed(
java.lang.Iterable<java.lang.String> values) {
ensureFailedIsMutable();
super.addAll(values, failed_);
onChanged();
return this;
}
/**
* <code>repeated string failed = 2;</code>
*/
public Builder clearFailed() {
failed_ = akka.protobuf.LazyStringArrayList.EMPTY;
bitField0_ = (bitField0_ & ~0x00000002);
onChanged();
return this;
}
/**
* <code>repeated string failed = 2;</code>
*/
public Builder addFailedBytes(
akka.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
ensureFailedIsMutable();
failed_.add(value);
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:ShardRegionStats)
}
@ -12033,20 +12218,21 @@ public final class ClusterShardingMessages {
"ties\030\001 \003(\t\"!\n\rEntityStarted\022\020\n\010entityId\030" +
"\001 \002(\t\"!\n\rEntityStopped\022\020\n\010entityId\030\001 \002(\t" +
"\"0\n\nShardStats\022\r\n\005shard\030\001 \002(\t\022\023\n\013entityC" +
"ount\030\002 \002(\005\"1\n\020ShardRegionStats\022\035\n\005stats\030" +
"\001 \003(\0132\016.MapFieldEntry\"+\n\rMapFieldEntry\022\013" +
"\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\005\"/\n\027GetCluster" +
"ShardingStats\022\024\n\014timeoutNanos\030\001 \002(\003\"A\n\024C" +
"lusterShardingStats\022)\n\005stats\030\001 \003(\0132\032.Clu" +
"sterShardingStatsEntry\"X\n\031ClusterShardin" +
"gStatsEntry\022\031\n\007address\030\001 \002(\0132\010.Address\022 ",
"\n\005stats\030\002 \002(\0132\021.ShardRegionStats\"+\n\016Curr" +
"entRegions\022\031\n\007regions\030\001 \003(\0132\010.Address\"K\n" +
"\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 \002" +
"(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n\004port\030\004 \002(\r\"\037\n\013St" +
"artEntity\022\020\n\010entityId\030\001 \002(\t\"3\n\016StartEnti" +
"tyAck\022\020\n\010entityId\030\001 \002(\t\022\017\n\007shardId\030\002 \002(\t" +
"B&\n\"akka.cluster.sharding.protobuf.msgH\001"
"ount\030\002 \002(\005\"A\n\020ShardRegionStats\022\035\n\005stats\030" +
"\001 \003(\0132\016.MapFieldEntry\022\016\n\006failed\030\002 \003(\t\"+\n" +
"\rMapFieldEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001" +
"(\005\"/\n\027GetClusterShardingStats\022\024\n\014timeout" +
"Nanos\030\001 \002(\003\"A\n\024ClusterShardingStats\022)\n\005s" +
"tats\030\001 \003(\0132\032.ClusterShardingStatsEntry\"X" +
"\n\031ClusterShardingStatsEntry\022\031\n\007address\030\001",
" \002(\0132\010.Address\022 \n\005stats\030\002 \002(\0132\021.ShardReg" +
"ionStats\"+\n\016CurrentRegions\022\031\n\007regions\030\001 " +
"\003(\0132\010.Address\"K\n\007Address\022\020\n\010protocol\030\001 \002" +
"(\t\022\016\n\006system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n\004" +
"port\030\004 \002(\r\"\037\n\013StartEntity\022\020\n\010entityId\030\001 " +
"\002(\t\"3\n\016StartEntityAck\022\020\n\010entityId\030\001 \002(\t\022" +
"\017\n\007shardId\030\002 \002(\tB&\n\"akka.cluster.shardin" +
"g.protobuf.msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -12118,7 +12304,7 @@ public final class ClusterShardingMessages {
internal_static_ShardRegionStats_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ShardRegionStats_descriptor,
new java.lang.String[] { "Stats", });
new java.lang.String[] { "Stats", "Failed", });
internal_static_MapFieldEntry_descriptor =
getDescriptor().getMessageTypes().get(10);
internal_static_MapFieldEntry_fieldAccessorTable = new

View file

@ -3,3 +3,12 @@ ProblemFilters.exclude[Problem]("akka.cluster.sharding.Shard.*")
# #25191
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.retryTask")
# #27100 Productionize: GetShardRegionStats returns empty shard set on ask timeout
# askAllShards, an internal function, was renamed and changed to query all or a subset of shards to try failures only
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.askAllShards")
# Added new field for failed shard queries to ShardRegion#ShardRegionStats, converted to class and updated in proto
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.protobuf.msg.ClusterShardingMessages#ShardRegionStatsOrBuilder.getFailedList")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.protobuf.msg.ClusterShardingMessages#ShardRegionStatsOrBuilder.getFailed")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.protobuf.msg.ClusterShardingMessages#ShardRegionStatsOrBuilder.getFailedBytes")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.protobuf.msg.ClusterShardingMessages#ShardRegionStatsOrBuilder.getFailedCount")

View file

@ -55,6 +55,7 @@ message ShardStats {
message ShardRegionStats {
repeated MapFieldEntry stats = 1;
repeated string failed = 2;
}
message MapFieldEntry {

View file

@ -8,10 +8,11 @@ import java.net.URLEncoder
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import akka.util.ccompat.JavaConverters._
import scala.collection.immutable
import scala.concurrent.Await
import scala.util.control.NonFatal
import akka.util.ccompat.JavaConverters._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem

View file

@ -252,7 +252,7 @@ private[akka] class Shard(
def processChange[E <: StateChange](event: E)(handler: E => Unit): Unit =
handler(event)
def receive = receiveCommand
def receive: Receive = receiveCommand
// Don't send back ShardInitialized so that messages are buffered in the ShardRegion
// while awaiting the lease

View file

@ -6,24 +6,27 @@ package akka.cluster.sharding
import java.net.URLEncoder
import akka.pattern.AskTimeoutException
import akka.util.{ MessageBufferMap, PrettyDuration, Timeout }
import akka.pattern.{ ask, pipe }
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.cluster.MemberStatus
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.concurrent.Promise
import scala.runtime.AbstractFunction1
import scala.util.Success
import scala.util.Failure
import akka.Done
import akka.annotation.InternalApi
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.cluster.ClusterSettings
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.sharding.Shard.ShardStats
import akka.pattern.{ ask, pipe }
import akka.util.{ MessageBufferMap, PrettyDuration, Timeout }
/**
* @see [[ClusterSharding$ ClusterSharding extension]]
@ -293,8 +296,14 @@ object ShardRegion {
*/
def getRegionStatsInstance = GetShardRegionStats
@SerialVersionUID(1L) final case class ShardRegionStats(stats: Map[ShardId, Int])
extends ClusterShardingSerializable {
/**
*
* @param stats the region stats mapping of `ShardId` to number of entities
* @param failed set of shards if any failed to respond within the timeout
*/
@SerialVersionUID(1L) final class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId])
extends ClusterShardingSerializable
with Product {
/**
* Java API
@ -304,6 +313,38 @@ object ShardRegion {
stats.asJava
}
/** Java API */
def getFailed(): java.util.Set[ShardId] = {
import akka.util.ccompat.JavaConverters._
failed.asJava
}
// For binary compatibility
def this(stats: Map[ShardId, Int]) = this(stats, Set.empty[ShardId])
private[sharding] def copy(stats: Map[ShardId, Int] = stats): ShardRegionStats =
new ShardRegionStats(stats, this.failed)
// For binary compatibility: class conversion from case class
override def equals(other: Any): Boolean = other match {
case o: ShardRegionStats => o.stats == stats && o.failed == failed
case _ => false
}
override def hashCode: Int = stats.## + failed.##
override def toString: String = s"ShardRegionStats[stats=$stats, failed=$failed]"
override def productArity: Int = 2
override def productElement(n: Int) =
if (n == 0) stats else if (n == 1) failed else throw new NoSuchElementException
override def canEqual(o: Any): Boolean = o.isInstanceOf[ShardRegionStats]
}
// For binary compatibility
object ShardRegionStats extends AbstractFunction1[Map[ShardId, Int], ShardRegionStats] {
def apply(stats: Map[ShardId, Int]): ShardRegionStats =
apply(stats, Set.empty[ShardId])
def apply(stats: Map[ShardId, Int], failed: Set[ShardId]): ShardRegionStats =
new ShardRegionStats(stats, failed)
def unapply(stats: ShardRegionStats): Option[Map[ShardId, Int]] =
Option(stats.stats)
}
/**
@ -395,7 +436,7 @@ object ShardRegion {
var remaining = entities
def receive = {
def receive: Receive = {
case ReceiveTimeout =>
log.warning(
"HandOffStopMessage[{}] is not handled by some of the entities of the `{}` shard, " +
@ -450,6 +491,7 @@ private[akka] class ShardRegion(
with ActorLogging
with Timers {
import ShardingQueries.ShardsQueryResult
import ShardCoordinator.Internal._
import ShardRegion._
import settings._
@ -694,15 +736,15 @@ private[akka] class ShardRegion(
case None => sender() ! CurrentRegions(Set.empty)
}
case msg: GetClusterShardingStats =>
coordinator.fold(sender ! ClusterShardingStats(Map.empty))(_.forward(msg))
case GetShardRegionState =>
replyToRegionStateQuery(sender())
case GetShardRegionStats =>
replyToRegionStatsQuery(sender())
case msg: GetClusterShardingStats =>
coordinator.fold(sender ! ClusterShardingStats(Map.empty))(_.forward(msg))
case _ => unhandled(query)
}
@ -738,38 +780,52 @@ private[akka] class ShardRegion(
}
def replyToRegionStateQuery(ref: ActorRef): Unit = {
askAllShards[Shard.CurrentShardState](Shard.GetCurrentShardState)
.map { shardStates =>
CurrentShardRegionState(shardStates.map {
case (shardId, state) => ShardRegion.ShardState(shardId, state.entityIds)
}.toSet)
}
.recover {
case _: AskTimeoutException => CurrentShardRegionState(Set.empty)
queryShards[Shard.CurrentShardState](shards, Shard.GetCurrentShardState)
.map { qr =>
// Productionize CurrentShardRegionState #27406
val state =
qr.responses.map(state => ShardRegion.ShardState(state.shardId, state.entityIds)) ++
qr.failed.map(sid => ShardRegion.ShardState(sid, Set.empty))
CurrentShardRegionState(state.toSet)
}
.pipeTo(ref)
}
def replyToRegionStatsQuery(ref: ActorRef): Unit = {
askAllShards[Shard.ShardStats](Shard.GetShardStats)
.map { shardStats =>
ShardRegionStats(shardStats.map {
case (shardId, stats) => (shardId, stats.entityCount)
}.toMap)
}
.recover {
case _: AskTimeoutException => ShardRegionStats(Map.empty)
queryShards[ShardStats](shards, Shard.GetShardStats)
.map { qr =>
ShardRegionStats(qr.responses.map(stats => (stats.shardId, stats.entityCount)).toMap, qr.failed)
}
.pipeTo(ref)
}
def askAllShards[T: ClassTag](msg: Any): Future[Seq[(ShardId, T)]] = {
/**
* Query all or a subset of shards, e.g. unresponsive shards that initially timed out.
* If the number of `shards` are less than this.shards.size, this could be a retry.
* Returns a partitioned set of any shards that may have not replied within the
* timeout and shards that did reply, to provide retry on only that subset.
*
* Logs a warning if any of the group timed out.
*
* To check subset unresponsive: {{{ queryShards[T](shards.filterKeys(u.contains), shardQuery) }}}
*/
def queryShards[T: ClassTag](shards: Map[ShardId, ActorRef], msg: Any): Future[ShardsQueryResult[T]] = {
implicit val timeout: Timeout = settings.shardRegionQueryTimeout
Future.sequence(shards.toSeq.map {
case (shardId, ref) => (ref ? msg).mapTo[T].map(t => (shardId, t))
})
Future.traverse(shards.toSeq) { case (shardId, shard) => askOne(shard, msg, shardId) }.map { ps =>
val qr = ShardsQueryResult[T](ps, this.shards.size)
if (qr.failed.nonEmpty) log.warning(qr.toString)
qr
}
}
private def askOne[T: ClassTag](shard: ActorRef, msg: Any, shardId: ShardId)(
implicit timeout: Timeout): Future[Either[ShardId, T]] =
(shard ? msg).mapTo[T].transform {
case Success(t) => Success(Right(t))
case Failure(_) => Success(Left(shardId))
}
private def tryCompleteGracefulShutdown() =
if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) {
context.stop(self) // all shards have been rebalanced, complete graceful shutdown

View file

@ -0,0 +1,73 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.annotation.InternalApi
/** INTERNAL API */
@InternalApi
private[sharding] object ShardingQueries {
/**
* INTERNAL API
* The result of a group query and metadata.
*
* @param failed the queries to shards that failed or did not reply within the
* configured timeout. This could be indicative of several states,
* for example still in initialization, restart, heavily loaded
* and busy, where returning zero entities is
* not indicative of the reason
* @param responses the responses received from the query
* @param total the total number of shards tracked versus a possible subset
* @param queried the number of shards queried, which could equal the total or be a
* subset if this was a retry of those that timed out
* @tparam B
*/
final case class ShardsQueryResult[B](failed: Set[ShardRegion.ShardId], responses: Seq[B], total: Int, queried: Int) {
/** Returns true if there was anything to query. */
private val nonEmpty: Boolean = total > 0 && queried > 0
/** Returns true if there was anything to query, all were queried and all failed within the timeout. */
def isTotalFailed: Boolean = nonEmpty && failed.size == total
/** Returns true if there was a subset to query and all in that subset failed within the timeout. */
def isAllSubsetFailed: Boolean = nonEmpty && queried < total && failed.size == queried
override val toString: String = {
if (total == 0)
s"Shard region had zero shards to gather metadata from."
else if (isTotalFailed || isAllSubsetFailed) {
s"All [${failed.size}] shards ${if (isAllSubsetFailed) "of subset" else ""} queried failed within the timeout."
} else {
s"Queried [$queried] shards of [$total]: responsive [${responses.size}], failed [${failed.size}] within the timeout."
}
}
}
object ShardsQueryResult {
/**
* @param ps the partitioned results of actors queried that did not reply by
* the timeout or returned another failure and those that did
* @param total the total number of actors tracked versus a possible subset
* @tparam B
*/
def apply[B](ps: Seq[Either[ShardRegion.ShardId, B]], total: Int): ShardsQueryResult[B] = {
val (t, r) = partition(ps)(identity)
ShardsQueryResult(t.toSet, r, total, ps.size)
}
def partition[T, A, B](ps: Seq[T])(f: T => Either[A, B]): (Seq[A], Seq[B]) = {
val (a, b) = ps.foldLeft((Nil: Seq[A], Nil: Seq[B]))((xs, y) => prepend(xs, f(y)))
(a.reverse, b.reverse)
}
def prepend[A, B](acc: (Seq[A], Seq[B]), next: Either[A, B]): (Seq[A], Seq[B]) =
next match {
case Left(l) => (l +: acc._1, acc._2)
case Right(r) => (acc._1, r +: acc._2)
}
}
}

View file

@ -365,6 +365,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
case (sid, no) =>
b.addStats(sm.MapFieldEntry.newBuilder().setKey(sid).setValue(no).build())
}
evt.failed.foreach { sid =>
b.addFailed(sid).build()
}
b.build()
}
@ -373,9 +376,10 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
shardRegionStatsFromProto(parsed)
}
private def shardRegionStatsFromProto(parsed: ClusterShardingMessages.ShardRegionStats) = {
private def shardRegionStatsFromProto(parsed: ClusterShardingMessages.ShardRegionStats): ShardRegionStats = {
val stats: Map[String, Int] = parsed.getStatsList.asScala.iterator.map(e => e.getKey -> e.getValue).toMap
ShardRegionStats(stats)
val failed: Set[String] = parsed.getFailedList.asScala.toSet
ShardRegionStats(stats, failed)
}
private def clusterShardingStatsToProto(evt: ClusterShardingStats): sm.ClusterShardingStats = {

View file

@ -4,15 +4,20 @@
package akka.cluster.sharding
import akka.actor._
import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit.{ TestDuration, TestProbe }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.serialization.jackson.CborSerializable
import akka.testkit.TestDuration
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
object ClusterShardingGetStatsSpec {
case object Stop extends CborSerializable
@ -142,6 +147,7 @@ abstract class ClusterShardingGetStatsSpec
shardStats.regions.size should ===(3)
shardStats.regions.values.map(_.stats.size).sum should ===(0)
shardStats.regions.keys.forall(_.hasGlobalScope) should ===(true)
shardStats.regions.values.forall(_.failed.isEmpty) shouldBe true
}
}
@ -165,7 +171,7 @@ abstract class ClusterShardingGetStatsSpec
enterBarrier("sharded actors started")
}
"get shard state" in {
"get shard stats" in {
within(10.seconds) {
awaitAssert {
val probe = TestProbe()
@ -174,11 +180,11 @@ abstract class ClusterShardingGetStatsSpec
val regions = probe.expectMsgType[ShardRegion.ClusterShardingStats].regions
regions.size shouldEqual 3
regions.values.flatMap(_.stats.values).sum shouldEqual 4
regions.keys.forall(_.hasGlobalScope) should be(true)
regions.values.forall(_.failed.isEmpty) shouldBe true
regions.keys.forall(_.hasGlobalScope) shouldBe true
}
}
enterBarrier("got shard state")
system.log.info("got shard state")
enterBarrier("received shard stats")
}
"return stats after a node leaves" in {

View file

@ -0,0 +1,161 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.serialization.jackson.CborSerializable
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
object ClusterShardingQueriesSpec {
case class Ping(id: Long) extends CborSerializable
case object Pong extends CborSerializable
class EntityActor extends Actor with ActorLogging {
def receive: Receive = {
case _: Ping => sender() ! Pong
}
}
val extractEntityId: ShardRegion.ExtractEntityId = {
case msg @ Ping(id) => (id.toString, msg)
}
val numberOfShards = 6
val extractShardId: ShardRegion.ExtractShardId = {
case Ping(id) => (id % numberOfShards).toString
}
val shardTypeName = "DatatypeA"
}
object ClusterShardingQueriesSpecConfig extends MultiNodeConfig {
val controller = role("controller")
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.classic.log-remote-lifecycle-events = off
akka.log-dead-letters-during-shutdown = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.sharding {
state-store-mode = "ddata"
shard-region-query-timeout = 0ms
updating-state-timeout = 2s
waiting-for-state-timeout = 2s
}
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = target/ClusterShardingGetStatsSpec/sharding-ddata
map-size = 10 MiB
}
""").withFallback(MultiNodeClusterSpec.clusterConfig)))
nodeConfig(first, second, third)(ConfigFactory.parseString("""akka.cluster.roles=["shard"]"""))
}
class ClusterShardingQueriesSpecMultiJvmNode1 extends ClusterShardingQueriesSpec
class ClusterShardingQueriesSpecMultiJvmNode2 extends ClusterShardingQueriesSpec
class ClusterShardingQueriesSpecMultiJvmNode3 extends ClusterShardingQueriesSpec
class ClusterShardingQueriesSpecMultiJvmNode4 extends ClusterShardingQueriesSpec
abstract class ClusterShardingQueriesSpec
extends MultiNodeSpec(ClusterShardingQueriesSpecConfig)
with MultiNodeClusterSpec
with ScalaFutures {
import ClusterShardingQueriesSpec._
import ClusterShardingQueriesSpecConfig._
def startShard(): ActorRef = {
ClusterSharding(system).start(
typeName = shardTypeName,
entityProps = Props(new EntityActor),
settings = ClusterShardingSettings(system).withRole("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
def startProxy(): ActorRef = {
ClusterSharding(system).startProxy(
typeName = shardTypeName,
role = Some("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
lazy val region = ClusterSharding(system).shardRegion(shardTypeName)
"Querying cluster sharding" must {
"join cluster, initialize sharding" in {
awaitClusterUp(controller, first, second, third)
runOn(controller) {
startProxy()
}
runOn(first, second, third) {
startShard()
}
enterBarrier("sharding started")
}
"trigger sharded actors" in {
runOn(controller) {
within(10.seconds) {
awaitAssert {
val pingProbe = TestProbe()
(0 to 20).foreach(n => region.tell(Ping(n), pingProbe.ref))
pingProbe.receiveWhile(messages = 20) {
case Pong => ()
}
}
}
}
enterBarrier("sharded actors started")
}
"get ShardIds of shards that timed out per region" in {
runOn(roles: _*) {
val probe = TestProbe()
val region = ClusterSharding(system).shardRegion(shardTypeName)
region.tell(ShardRegion.GetClusterShardingStats(10.seconds), probe.ref)
val regions = probe.expectMsgType[ShardRegion.ClusterShardingStats].regions
regions.size shouldEqual 3
val timeouts = numberOfShards / regions.size
// 3 regions, 2 shards per region, all 2 shards/region were unresponsive
// within shard-region-query-timeout = 0ms
regions.values.forall { s =>
s.stats.isEmpty && s.failed.size == timeouts
} shouldBe true
regions.values.map(_.failed.size).sum shouldEqual numberOfShards
enterBarrier("received stats")
}
enterBarrier("done")
}
}
}

View file

@ -0,0 +1,54 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.cluster.sharding.Shard.ShardStats
import akka.cluster.sharding.ShardRegion.ShardState
import akka.cluster.sharding.ShardingQueries.ShardsQueryResult
import akka.testkit.AkkaSpec
class ShardingQueriesSpec extends AkkaSpec {
private val shards = Seq("a", "b", "busy")
private val timeouts = Set("busy")
"ShardsQueryResult" must {
"reflect nothing to acquire metadata from - 0 shards" in {
val qr = ShardsQueryResult[ShardState](Seq.empty, 0)
qr.total shouldEqual qr.queried
qr.isTotalFailed shouldBe false // you'd have to make > 0 attempts in order to fail
qr.isAllSubsetFailed shouldBe false // same
}
"partition failures and responses by type and by convention (failed Left T Right)" in {
val responses = Seq(ShardStats("a", 1), ShardStats("b", 1))
val results = responses.map(Right(_)) ++ timeouts.map(Left(_))
val qr = ShardsQueryResult[ShardStats](results, shards.size)
qr.failed shouldEqual timeouts
qr.responses shouldEqual responses
qr.isTotalFailed shouldBe false
qr.isAllSubsetFailed shouldBe false
}
"detect a subset query - not all queried" in {
val responses = Seq(ShardStats("a", 1), ShardStats("b", 1))
val results = responses.map(Right(_)) ++ timeouts.map(Left(_))
val qr = ShardsQueryResult[ShardStats](results, shards.size + 1)
qr.isAllSubsetFailed shouldBe false // is subset, not all failed
qr.total > qr.queried shouldBe true
qr.queried < shards.size
}
"partition when all failed" in {
val results = Seq(Left("c"), Left("d"))
val qr = ShardsQueryResult[ShardState](results, results.size)
qr.total shouldEqual qr.queried
qr.isTotalFailed shouldBe true
qr.isAllSubsetFailed shouldBe false // not a subset
}
}
}

View file

@ -5,13 +5,16 @@
package akka.cluster.sharding.protobuf
import scala.concurrent.duration._
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.testkit.AkkaSpec
import akka.actor.Props
import akka.cluster.sharding.Shard
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.{ Shard, ShardCoordinator, ShardRegion }
import akka.serialization.SerializationExtension
import akka.testkit.AkkaSpec
class ClusterShardingMessageSerializerSpec extends AkkaSpec {
import ShardCoordinator.Internal._
@ -88,8 +91,8 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
}
"be able to serialize ShardRegionStats" in {
checkSerialization(ShardRegion.ShardRegionStats(Map.empty[ShardId, Int]))
checkSerialization(ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23)))
checkSerialization(ShardRegion.ShardRegionStats(Map.empty[ShardId, Int], Set.empty[ShardId]))
checkSerialization(ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23), Set("b")))
}
"be able to serialize StartEntity" in {
@ -107,8 +110,8 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
checkSerialization(ShardRegion.GetClusterShardingStats(3.seconds))
checkSerialization(
ShardRegion.ClusterShardingStats(Map(
Address("akka", "sys", "a", 2552) -> ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23)),
Address("akka", "sys", "b", 2552) -> ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23)))))
Address("akka", "sys", "a", 2552) -> ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23), Set("b")),
Address("akka", "sys", "b", 2552) -> ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23), Set("b")))))
}
}
}

View file

@ -523,7 +523,9 @@ the identifiers of the shards running in a Region and what entities are alive fo
`ShardRegion.GetClusterShardingStats` which will query all the regions in the cluster and return
a `ShardRegion.ClusterShardingStats` containing the identifiers of the shards running in each region and a count
of entities that are alive in each shard.
of entities that are alive in each shard. If any shard queries failed, for example due to timeout
if a shard was too busy to reply within the configured `akka.cluster.sharding.shard-region-query-timeout`,
`ShardRegion.ClusterShardingStats` will also include the set of shard identifiers by region that failed.
The type names of all started shards can be acquired via @scala[`ClusterSharding.shardTypeNames`] @java[`ClusterSharding.getShardTypeNames`].

View file

@ -364,6 +364,12 @@ akka.cluster.sharding.passivate-idle-entity-after = off
It is always disabled if @ref:[Remembering Entities](../cluster-sharding.md#remembering-entities) is enabled.
#### Cluster Sharding stats
A new field has been added to the response of a `ShardRegion.GetClusterShardingStats` command
for any shards per region that may have failed or not responded within the new configurable `akka.cluster.sharding.shard-region-query-timeout`.
This is described further in @ref:[inspecting sharding state](../cluster-sharding.md#inspecting-cluster-sharding-state).
### Distributed Data
Configuration properties for controlling sizes of `Gossip` and `DeltaPropagation` messages in Distributed Data