diff --git a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java
index 3b7b041741..85baad966a 100644
--- a/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java
+++ b/akka-cluster-sharding/src/main/java/akka/cluster/sharding/protobuf/msg/ClusterShardingMessages.java
@@ -6190,6 +6190,26 @@ public final class ClusterShardingMessages {
*/
akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.MapFieldEntryOrBuilder getStatsOrBuilder(
int index);
+
+ // repeated string failed = 2;
+ /**
+ * repeated string failed = 2;
+ */
+ java.util.List
+ getFailedList();
+ /**
+ * repeated string failed = 2;
+ */
+ int getFailedCount();
+ /**
+ * repeated string failed = 2;
+ */
+ java.lang.String getFailed(int index);
+ /**
+ * repeated string failed = 2;
+ */
+ akka.protobuf.ByteString
+ getFailedBytes(int index);
}
/**
* Protobuf type {@code ShardRegionStats}
@@ -6250,6 +6270,14 @@ public final class ClusterShardingMessages {
stats_.add(input.readMessage(akka.cluster.sharding.protobuf.msg.ClusterShardingMessages.MapFieldEntry.PARSER, extensionRegistry));
break;
}
+ case 18: {
+ if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
+ failed_ = new akka.protobuf.LazyStringArrayList();
+ mutable_bitField0_ |= 0x00000002;
+ }
+ failed_.add(input.readBytes());
+ break;
+ }
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@@ -6261,6 +6289,9 @@ public final class ClusterShardingMessages {
if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
stats_ = java.util.Collections.unmodifiableList(stats_);
}
+ if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
+ failed_ = new akka.protobuf.UnmodifiableLazyStringList(failed_);
+ }
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
@@ -6328,8 +6359,39 @@ public final class ClusterShardingMessages {
return stats_.get(index);
}
+ // repeated string failed = 2;
+ public static final int FAILED_FIELD_NUMBER = 2;
+ private akka.protobuf.LazyStringList failed_;
+ /**
+ * repeated string failed = 2;
+ */
+ public java.util.List
+ getFailedList() {
+ return failed_;
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public int getFailedCount() {
+ return failed_.size();
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public java.lang.String getFailed(int index) {
+ return failed_.get(index);
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public akka.protobuf.ByteString
+ getFailedBytes(int index) {
+ return failed_.getByteString(index);
+ }
+
private void initFields() {
stats_ = java.util.Collections.emptyList();
+ failed_ = akka.protobuf.LazyStringArrayList.EMPTY;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -6346,6 +6408,9 @@ public final class ClusterShardingMessages {
for (int i = 0; i < stats_.size(); i++) {
output.writeMessage(1, stats_.get(i));
}
+ for (int i = 0; i < failed_.size(); i++) {
+ output.writeBytes(2, failed_.getByteString(i));
+ }
getUnknownFields().writeTo(output);
}
@@ -6359,6 +6424,15 @@ public final class ClusterShardingMessages {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(1, stats_.get(i));
}
+ {
+ int dataSize = 0;
+ for (int i = 0; i < failed_.size(); i++) {
+ dataSize += akka.protobuf.CodedOutputStream
+ .computeBytesSizeNoTag(failed_.getByteString(i));
+ }
+ size += dataSize;
+ size += 1 * getFailedList().size();
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -6482,6 +6556,8 @@ public final class ClusterShardingMessages {
} else {
statsBuilder_.clear();
}
+ failed_ = akka.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
@@ -6518,6 +6594,12 @@ public final class ClusterShardingMessages {
} else {
result.stats_ = statsBuilder_.build();
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ failed_ = new akka.protobuf.UnmodifiableLazyStringList(
+ failed_);
+ bitField0_ = (bitField0_ & ~0x00000002);
+ }
+ result.failed_ = failed_;
onBuilt();
return result;
}
@@ -6559,6 +6641,16 @@ public final class ClusterShardingMessages {
}
}
}
+ if (!other.failed_.isEmpty()) {
+ if (failed_.isEmpty()) {
+ failed_ = other.failed_;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ } else {
+ ensureFailedIsMutable();
+ failed_.addAll(other.failed_);
+ }
+ onChanged();
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -6826,6 +6918,99 @@ public final class ClusterShardingMessages {
return statsBuilder_;
}
+ // repeated string failed = 2;
+ private akka.protobuf.LazyStringList failed_ = akka.protobuf.LazyStringArrayList.EMPTY;
+ private void ensureFailedIsMutable() {
+ if (!((bitField0_ & 0x00000002) == 0x00000002)) {
+ failed_ = new akka.protobuf.LazyStringArrayList(failed_);
+ bitField0_ |= 0x00000002;
+ }
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public java.util.List
+ getFailedList() {
+ return java.util.Collections.unmodifiableList(failed_);
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public int getFailedCount() {
+ return failed_.size();
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public java.lang.String getFailed(int index) {
+ return failed_.get(index);
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public akka.protobuf.ByteString
+ getFailedBytes(int index) {
+ return failed_.getByteString(index);
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public Builder setFailed(
+ int index, java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureFailedIsMutable();
+ failed_.set(index, value);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public Builder addFailed(
+ java.lang.String value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureFailedIsMutable();
+ failed_.add(value);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public Builder addAllFailed(
+ java.lang.Iterable values) {
+ ensureFailedIsMutable();
+ super.addAll(values, failed_);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public Builder clearFailed() {
+ failed_ = akka.protobuf.LazyStringArrayList.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000002);
+ onChanged();
+ return this;
+ }
+ /**
+ * repeated string failed = 2;
+ */
+ public Builder addFailedBytes(
+ akka.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ ensureFailedIsMutable();
+ failed_.add(value);
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:ShardRegionStats)
}
@@ -12033,20 +12218,21 @@ public final class ClusterShardingMessages {
"ties\030\001 \003(\t\"!\n\rEntityStarted\022\020\n\010entityId\030" +
"\001 \002(\t\"!\n\rEntityStopped\022\020\n\010entityId\030\001 \002(\t" +
"\"0\n\nShardStats\022\r\n\005shard\030\001 \002(\t\022\023\n\013entityC" +
- "ount\030\002 \002(\005\"1\n\020ShardRegionStats\022\035\n\005stats\030" +
- "\001 \003(\0132\016.MapFieldEntry\"+\n\rMapFieldEntry\022\013" +
- "\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001(\005\"/\n\027GetCluster" +
- "ShardingStats\022\024\n\014timeoutNanos\030\001 \002(\003\"A\n\024C" +
- "lusterShardingStats\022)\n\005stats\030\001 \003(\0132\032.Clu" +
- "sterShardingStatsEntry\"X\n\031ClusterShardin" +
- "gStatsEntry\022\031\n\007address\030\001 \002(\0132\010.Address\022 ",
- "\n\005stats\030\002 \002(\0132\021.ShardRegionStats\"+\n\016Curr" +
- "entRegions\022\031\n\007regions\030\001 \003(\0132\010.Address\"K\n" +
- "\007Address\022\020\n\010protocol\030\001 \002(\t\022\016\n\006system\030\002 \002" +
- "(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n\004port\030\004 \002(\r\"\037\n\013St" +
- "artEntity\022\020\n\010entityId\030\001 \002(\t\"3\n\016StartEnti" +
- "tyAck\022\020\n\010entityId\030\001 \002(\t\022\017\n\007shardId\030\002 \002(\t" +
- "B&\n\"akka.cluster.sharding.protobuf.msgH\001"
+ "ount\030\002 \002(\005\"A\n\020ShardRegionStats\022\035\n\005stats\030" +
+ "\001 \003(\0132\016.MapFieldEntry\022\016\n\006failed\030\002 \003(\t\"+\n" +
+ "\rMapFieldEntry\022\013\n\003key\030\001 \001(\t\022\r\n\005value\030\002 \001" +
+ "(\005\"/\n\027GetClusterShardingStats\022\024\n\014timeout" +
+ "Nanos\030\001 \002(\003\"A\n\024ClusterShardingStats\022)\n\005s" +
+ "tats\030\001 \003(\0132\032.ClusterShardingStatsEntry\"X" +
+ "\n\031ClusterShardingStatsEntry\022\031\n\007address\030\001",
+ " \002(\0132\010.Address\022 \n\005stats\030\002 \002(\0132\021.ShardReg" +
+ "ionStats\"+\n\016CurrentRegions\022\031\n\007regions\030\001 " +
+ "\003(\0132\010.Address\"K\n\007Address\022\020\n\010protocol\030\001 \002" +
+ "(\t\022\016\n\006system\030\002 \002(\t\022\020\n\010hostname\030\003 \002(\t\022\014\n\004" +
+ "port\030\004 \002(\r\"\037\n\013StartEntity\022\020\n\010entityId\030\001 " +
+ "\002(\t\"3\n\016StartEntityAck\022\020\n\010entityId\030\001 \002(\t\022" +
+ "\017\n\007shardId\030\002 \002(\tB&\n\"akka.cluster.shardin" +
+ "g.protobuf.msgH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -12118,7 +12304,7 @@ public final class ClusterShardingMessages {
internal_static_ShardRegionStats_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ShardRegionStats_descriptor,
- new java.lang.String[] { "Stats", });
+ new java.lang.String[] { "Stats", "Failed", });
internal_static_MapFieldEntry_descriptor =
getDescriptor().getMessageTypes().get(10);
internal_static_MapFieldEntry_fieldAccessorTable = new
diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.x.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.x.backwards.excludes
index 4146a2b32e..11fadf550f 100644
--- a/akka-cluster-sharding/src/main/mima-filters/2.5.x.backwards.excludes
+++ b/akka-cluster-sharding/src/main/mima-filters/2.5.x.backwards.excludes
@@ -3,3 +3,12 @@ ProblemFilters.exclude[Problem]("akka.cluster.sharding.Shard.*")
# #25191
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.retryTask")
+
+# #27100 Productionize: GetShardRegionStats returns empty shard set on ask timeout
+# askAllShards, an internal function, was renamed and changed to query all or a subset of shards to try failures only
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.askAllShards")
+# Added new field for failed shard queries to ShardRegion#ShardRegionStats, converted to class and updated in proto
+ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.protobuf.msg.ClusterShardingMessages#ShardRegionStatsOrBuilder.getFailedList")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.protobuf.msg.ClusterShardingMessages#ShardRegionStatsOrBuilder.getFailed")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.protobuf.msg.ClusterShardingMessages#ShardRegionStatsOrBuilder.getFailedBytes")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.protobuf.msg.ClusterShardingMessages#ShardRegionStatsOrBuilder.getFailedCount")
diff --git a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto
index ae040a60cf..2ebb4b6037 100644
--- a/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto
+++ b/akka-cluster-sharding/src/main/protobuf/ClusterShardingMessages.proto
@@ -55,6 +55,7 @@ message ShardStats {
message ShardRegionStats {
repeated MapFieldEntry stats = 1;
+ repeated string failed = 2;
}
message MapFieldEntry {
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala
index 03610a4e9f..cddc4d95e3 100755
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala
@@ -8,10 +8,11 @@ import java.net.URLEncoder
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
-import akka.util.ccompat.JavaConverters._
import scala.collection.immutable
import scala.concurrent.Await
import scala.util.control.NonFatal
+
+import akka.util.ccompat.JavaConverters._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
index 10da263c70..32647c50cd 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
@@ -252,7 +252,7 @@ private[akka] class Shard(
def processChange[E <: StateChange](event: E)(handler: E => Unit): Unit =
handler(event)
- def receive = receiveCommand
+ def receive: Receive = receiveCommand
// Don't send back ShardInitialized so that messages are buffered in the ShardRegion
// while awaiting the lease
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala
index 5eae72e246..3369557ecc 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala
@@ -6,24 +6,27 @@ package akka.cluster.sharding
import java.net.URLEncoder
-import akka.pattern.AskTimeoutException
-import akka.util.{ MessageBufferMap, PrettyDuration, Timeout }
-import akka.pattern.{ ask, pipe }
-import akka.actor._
-import akka.cluster.Cluster
-import akka.cluster.ClusterEvent._
-import akka.cluster.Member
-import akka.cluster.MemberStatus
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.concurrent.Promise
+import scala.runtime.AbstractFunction1
+import scala.util.Success
+import scala.util.Failure
import akka.Done
import akka.annotation.InternalApi
+import akka.actor._
+import akka.cluster.Cluster
+import akka.cluster.ClusterEvent._
+import akka.cluster.Member
+import akka.cluster.MemberStatus
import akka.cluster.ClusterSettings
import akka.cluster.ClusterSettings.DataCenter
+import akka.cluster.sharding.Shard.ShardStats
+import akka.pattern.{ ask, pipe }
+import akka.util.{ MessageBufferMap, PrettyDuration, Timeout }
/**
* @see [[ClusterSharding$ ClusterSharding extension]]
@@ -293,8 +296,14 @@ object ShardRegion {
*/
def getRegionStatsInstance = GetShardRegionStats
- @SerialVersionUID(1L) final case class ShardRegionStats(stats: Map[ShardId, Int])
- extends ClusterShardingSerializable {
+ /**
+ *
+ * @param stats the region stats mapping of `ShardId` to number of entities
+ * @param failed set of shards if any failed to respond within the timeout
+ */
+ @SerialVersionUID(1L) final class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId])
+ extends ClusterShardingSerializable
+ with Product {
/**
* Java API
@@ -304,6 +313,38 @@ object ShardRegion {
stats.asJava
}
+ /** Java API */
+ def getFailed(): java.util.Set[ShardId] = {
+ import akka.util.ccompat.JavaConverters._
+ failed.asJava
+ }
+
+ // For binary compatibility
+ def this(stats: Map[ShardId, Int]) = this(stats, Set.empty[ShardId])
+ private[sharding] def copy(stats: Map[ShardId, Int] = stats): ShardRegionStats =
+ new ShardRegionStats(stats, this.failed)
+
+ // For binary compatibility: class conversion from case class
+ override def equals(other: Any): Boolean = other match {
+ case o: ShardRegionStats => o.stats == stats && o.failed == failed
+ case _ => false
+ }
+ override def hashCode: Int = stats.## + failed.##
+ override def toString: String = s"ShardRegionStats[stats=$stats, failed=$failed]"
+ override def productArity: Int = 2
+ override def productElement(n: Int) =
+ if (n == 0) stats else if (n == 1) failed else throw new NoSuchElementException
+ override def canEqual(o: Any): Boolean = o.isInstanceOf[ShardRegionStats]
+
+ }
+ // For binary compatibility
+ object ShardRegionStats extends AbstractFunction1[Map[ShardId, Int], ShardRegionStats] {
+ def apply(stats: Map[ShardId, Int]): ShardRegionStats =
+ apply(stats, Set.empty[ShardId])
+ def apply(stats: Map[ShardId, Int], failed: Set[ShardId]): ShardRegionStats =
+ new ShardRegionStats(stats, failed)
+ def unapply(stats: ShardRegionStats): Option[Map[ShardId, Int]] =
+ Option(stats.stats)
}
/**
@@ -395,7 +436,7 @@ object ShardRegion {
var remaining = entities
- def receive = {
+ def receive: Receive = {
case ReceiveTimeout =>
log.warning(
"HandOffStopMessage[{}] is not handled by some of the entities of the `{}` shard, " +
@@ -450,6 +491,7 @@ private[akka] class ShardRegion(
with ActorLogging
with Timers {
+ import ShardingQueries.ShardsQueryResult
import ShardCoordinator.Internal._
import ShardRegion._
import settings._
@@ -694,15 +736,15 @@ private[akka] class ShardRegion(
case None => sender() ! CurrentRegions(Set.empty)
}
+ case msg: GetClusterShardingStats =>
+ coordinator.fold(sender ! ClusterShardingStats(Map.empty))(_.forward(msg))
+
case GetShardRegionState =>
replyToRegionStateQuery(sender())
case GetShardRegionStats =>
replyToRegionStatsQuery(sender())
- case msg: GetClusterShardingStats =>
- coordinator.fold(sender ! ClusterShardingStats(Map.empty))(_.forward(msg))
-
case _ => unhandled(query)
}
@@ -738,38 +780,52 @@ private[akka] class ShardRegion(
}
def replyToRegionStateQuery(ref: ActorRef): Unit = {
- askAllShards[Shard.CurrentShardState](Shard.GetCurrentShardState)
- .map { shardStates =>
- CurrentShardRegionState(shardStates.map {
- case (shardId, state) => ShardRegion.ShardState(shardId, state.entityIds)
- }.toSet)
- }
- .recover {
- case _: AskTimeoutException => CurrentShardRegionState(Set.empty)
+ queryShards[Shard.CurrentShardState](shards, Shard.GetCurrentShardState)
+ .map { qr =>
+ // Productionize CurrentShardRegionState #27406
+ val state =
+ qr.responses.map(state => ShardRegion.ShardState(state.shardId, state.entityIds)) ++
+ qr.failed.map(sid => ShardRegion.ShardState(sid, Set.empty))
+ CurrentShardRegionState(state.toSet)
}
.pipeTo(ref)
}
def replyToRegionStatsQuery(ref: ActorRef): Unit = {
- askAllShards[Shard.ShardStats](Shard.GetShardStats)
- .map { shardStats =>
- ShardRegionStats(shardStats.map {
- case (shardId, stats) => (shardId, stats.entityCount)
- }.toMap)
- }
- .recover {
- case _: AskTimeoutException => ShardRegionStats(Map.empty)
+ queryShards[ShardStats](shards, Shard.GetShardStats)
+ .map { qr =>
+ ShardRegionStats(qr.responses.map(stats => (stats.shardId, stats.entityCount)).toMap, qr.failed)
}
.pipeTo(ref)
}
- def askAllShards[T: ClassTag](msg: Any): Future[Seq[(ShardId, T)]] = {
+ /**
+ * Query all or a subset of shards, e.g. unresponsive shards that initially timed out.
+ * If the number of `shards` are less than this.shards.size, this could be a retry.
+ * Returns a partitioned set of any shards that may have not replied within the
+ * timeout and shards that did reply, to provide retry on only that subset.
+ *
+ * Logs a warning if any of the group timed out.
+ *
+ * To check subset unresponsive: {{{ queryShards[T](shards.filterKeys(u.contains), shardQuery) }}}
+ */
+ def queryShards[T: ClassTag](shards: Map[ShardId, ActorRef], msg: Any): Future[ShardsQueryResult[T]] = {
implicit val timeout: Timeout = settings.shardRegionQueryTimeout
- Future.sequence(shards.toSeq.map {
- case (shardId, ref) => (ref ? msg).mapTo[T].map(t => (shardId, t))
- })
+
+ Future.traverse(shards.toSeq) { case (shardId, shard) => askOne(shard, msg, shardId) }.map { ps =>
+ val qr = ShardsQueryResult[T](ps, this.shards.size)
+ if (qr.failed.nonEmpty) log.warning(qr.toString)
+ qr
+ }
}
+ private def askOne[T: ClassTag](shard: ActorRef, msg: Any, shardId: ShardId)(
+ implicit timeout: Timeout): Future[Either[ShardId, T]] =
+ (shard ? msg).mapTo[T].transform {
+ case Success(t) => Success(Right(t))
+ case Failure(_) => Success(Left(shardId))
+ }
+
private def tryCompleteGracefulShutdown() =
if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) {
context.stop(self) // all shards have been rebalanced, complete graceful shutdown
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardingQueries.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardingQueries.scala
new file mode 100644
index 0000000000..c83435a510
--- /dev/null
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardingQueries.scala
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2019 Lightbend Inc.
+ */
+
+package akka.cluster.sharding
+
+import akka.annotation.InternalApi
+
+/** INTERNAL API */
+@InternalApi
+private[sharding] object ShardingQueries {
+
+ /**
+ * INTERNAL API
+ * The result of a group query and metadata.
+ *
+ * @param failed the queries to shards that failed or did not reply within the
+ * configured timeout. This could be indicative of several states,
+ * for example still in initialization, restart, heavily loaded
+ * and busy, where returning zero entities is
+ * not indicative of the reason
+ * @param responses the responses received from the query
+ * @param total the total number of shards tracked versus a possible subset
+ * @param queried the number of shards queried, which could equal the total or be a
+ * subset if this was a retry of those that timed out
+ * @tparam B
+ */
+ final case class ShardsQueryResult[B](failed: Set[ShardRegion.ShardId], responses: Seq[B], total: Int, queried: Int) {
+
+ /** Returns true if there was anything to query. */
+ private val nonEmpty: Boolean = total > 0 && queried > 0
+
+ /** Returns true if there was anything to query, all were queried and all failed within the timeout. */
+ def isTotalFailed: Boolean = nonEmpty && failed.size == total
+
+ /** Returns true if there was a subset to query and all in that subset failed within the timeout. */
+ def isAllSubsetFailed: Boolean = nonEmpty && queried < total && failed.size == queried
+
+ override val toString: String = {
+ if (total == 0)
+ s"Shard region had zero shards to gather metadata from."
+ else if (isTotalFailed || isAllSubsetFailed) {
+ s"All [${failed.size}] shards ${if (isAllSubsetFailed) "of subset" else ""} queried failed within the timeout."
+ } else {
+ s"Queried [$queried] shards of [$total]: responsive [${responses.size}], failed [${failed.size}] within the timeout."
+ }
+ }
+ }
+ object ShardsQueryResult {
+
+ /**
+ * @param ps the partitioned results of actors queried that did not reply by
+ * the timeout or returned another failure and those that did
+ * @param total the total number of actors tracked versus a possible subset
+ * @tparam B
+ */
+ def apply[B](ps: Seq[Either[ShardRegion.ShardId, B]], total: Int): ShardsQueryResult[B] = {
+ val (t, r) = partition(ps)(identity)
+ ShardsQueryResult(t.toSet, r, total, ps.size)
+ }
+
+ def partition[T, A, B](ps: Seq[T])(f: T => Either[A, B]): (Seq[A], Seq[B]) = {
+ val (a, b) = ps.foldLeft((Nil: Seq[A], Nil: Seq[B]))((xs, y) => prepend(xs, f(y)))
+ (a.reverse, b.reverse)
+ }
+
+ def prepend[A, B](acc: (Seq[A], Seq[B]), next: Either[A, B]): (Seq[A], Seq[B]) =
+ next match {
+ case Left(l) => (l +: acc._1, acc._2)
+ case Right(r) => (acc._1, r +: acc._2)
+ }
+ }
+}
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
index c08d0a126d..e540139464 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala
@@ -365,6 +365,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
case (sid, no) =>
b.addStats(sm.MapFieldEntry.newBuilder().setKey(sid).setValue(no).build())
}
+ evt.failed.foreach { sid =>
+ b.addFailed(sid).build()
+ }
b.build()
}
@@ -373,9 +376,10 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
shardRegionStatsFromProto(parsed)
}
- private def shardRegionStatsFromProto(parsed: ClusterShardingMessages.ShardRegionStats) = {
+ private def shardRegionStatsFromProto(parsed: ClusterShardingMessages.ShardRegionStats): ShardRegionStats = {
val stats: Map[String, Int] = parsed.getStatsList.asScala.iterator.map(e => e.getKey -> e.getValue).toMap
- ShardRegionStats(stats)
+ val failed: Set[String] = parsed.getFailedList.asScala.toSet
+ ShardRegionStats(stats, failed)
}
private def clusterShardingStatsToProto(evt: ClusterShardingStats): sm.ClusterShardingStats = {
diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala
index e2dafbba1a..3eb245ca1a 100644
--- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala
+++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala
@@ -4,15 +4,20 @@
package akka.cluster.sharding
-import akka.actor._
-import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec }
-import akka.remote.testconductor.RoleName
-import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
-import akka.testkit.{ TestDuration, TestProbe }
-import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
+import akka.actor._
+import akka.cluster.Cluster
+import akka.cluster.MemberStatus
+import akka.cluster.MultiNodeClusterSpec
+import akka.remote.testconductor.RoleName
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.remote.testkit.STMultiNodeSpec
import akka.serialization.jackson.CborSerializable
+import akka.testkit.TestDuration
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
object ClusterShardingGetStatsSpec {
case object Stop extends CborSerializable
@@ -142,6 +147,7 @@ abstract class ClusterShardingGetStatsSpec
shardStats.regions.size should ===(3)
shardStats.regions.values.map(_.stats.size).sum should ===(0)
shardStats.regions.keys.forall(_.hasGlobalScope) should ===(true)
+ shardStats.regions.values.forall(_.failed.isEmpty) shouldBe true
}
}
@@ -165,7 +171,7 @@ abstract class ClusterShardingGetStatsSpec
enterBarrier("sharded actors started")
}
- "get shard state" in {
+ "get shard stats" in {
within(10.seconds) {
awaitAssert {
val probe = TestProbe()
@@ -174,11 +180,11 @@ abstract class ClusterShardingGetStatsSpec
val regions = probe.expectMsgType[ShardRegion.ClusterShardingStats].regions
regions.size shouldEqual 3
regions.values.flatMap(_.stats.values).sum shouldEqual 4
- regions.keys.forall(_.hasGlobalScope) should be(true)
+ regions.values.forall(_.failed.isEmpty) shouldBe true
+ regions.keys.forall(_.hasGlobalScope) shouldBe true
}
}
- enterBarrier("got shard state")
- system.log.info("got shard state")
+ enterBarrier("received shard stats")
}
"return stats after a node leaves" in {
diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingQueriesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingQueriesSpec.scala
new file mode 100644
index 0000000000..79d7c9dc24
--- /dev/null
+++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingQueriesSpec.scala
@@ -0,0 +1,161 @@
+/*
+ * Copyright (C) 2019 Lightbend Inc.
+ */
+
+package akka.cluster.sharding
+
+import scala.concurrent.duration._
+
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.cluster.MultiNodeClusterSpec
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.serialization.jackson.CborSerializable
+import akka.testkit.TestProbe
+import com.typesafe.config.ConfigFactory
+import org.scalatest.concurrent.ScalaFutures
+
+object ClusterShardingQueriesSpec {
+ case class Ping(id: Long) extends CborSerializable
+ case object Pong extends CborSerializable
+
+ class EntityActor extends Actor with ActorLogging {
+ def receive: Receive = {
+ case _: Ping => sender() ! Pong
+ }
+ }
+
+ val extractEntityId: ShardRegion.ExtractEntityId = {
+ case msg @ Ping(id) => (id.toString, msg)
+ }
+
+ val numberOfShards = 6
+
+ val extractShardId: ShardRegion.ExtractShardId = {
+ case Ping(id) => (id % numberOfShards).toString
+ }
+
+ val shardTypeName = "DatatypeA"
+}
+
+object ClusterShardingQueriesSpecConfig extends MultiNodeConfig {
+
+ val controller = role("controller")
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+
+ commonConfig(
+ debugConfig(on = false)
+ .withFallback(ConfigFactory.parseString("""
+ akka.loglevel = INFO
+ akka.actor.provider = "cluster"
+ akka.remote.classic.log-remote-lifecycle-events = off
+ akka.log-dead-letters-during-shutdown = off
+ akka.cluster.auto-down-unreachable-after = 0s
+ akka.cluster.sharding {
+ state-store-mode = "ddata"
+ shard-region-query-timeout = 0ms
+ updating-state-timeout = 2s
+ waiting-for-state-timeout = 2s
+ }
+ akka.cluster.sharding.distributed-data.durable.lmdb {
+ dir = target/ClusterShardingGetStatsSpec/sharding-ddata
+ map-size = 10 MiB
+ }
+ """).withFallback(MultiNodeClusterSpec.clusterConfig)))
+
+ nodeConfig(first, second, third)(ConfigFactory.parseString("""akka.cluster.roles=["shard"]"""))
+
+}
+
+class ClusterShardingQueriesSpecMultiJvmNode1 extends ClusterShardingQueriesSpec
+class ClusterShardingQueriesSpecMultiJvmNode2 extends ClusterShardingQueriesSpec
+class ClusterShardingQueriesSpecMultiJvmNode3 extends ClusterShardingQueriesSpec
+class ClusterShardingQueriesSpecMultiJvmNode4 extends ClusterShardingQueriesSpec
+
+abstract class ClusterShardingQueriesSpec
+ extends MultiNodeSpec(ClusterShardingQueriesSpecConfig)
+ with MultiNodeClusterSpec
+ with ScalaFutures {
+
+ import ClusterShardingQueriesSpec._
+ import ClusterShardingQueriesSpecConfig._
+
+ def startShard(): ActorRef = {
+ ClusterSharding(system).start(
+ typeName = shardTypeName,
+ entityProps = Props(new EntityActor),
+ settings = ClusterShardingSettings(system).withRole("shard"),
+ extractEntityId = extractEntityId,
+ extractShardId = extractShardId)
+ }
+
+ def startProxy(): ActorRef = {
+ ClusterSharding(system).startProxy(
+ typeName = shardTypeName,
+ role = Some("shard"),
+ extractEntityId = extractEntityId,
+ extractShardId = extractShardId)
+ }
+
+ lazy val region = ClusterSharding(system).shardRegion(shardTypeName)
+
+ "Querying cluster sharding" must {
+
+ "join cluster, initialize sharding" in {
+ awaitClusterUp(controller, first, second, third)
+
+ runOn(controller) {
+ startProxy()
+ }
+
+ runOn(first, second, third) {
+ startShard()
+ }
+
+ enterBarrier("sharding started")
+ }
+
+ "trigger sharded actors" in {
+ runOn(controller) {
+ within(10.seconds) {
+ awaitAssert {
+ val pingProbe = TestProbe()
+ (0 to 20).foreach(n => region.tell(Ping(n), pingProbe.ref))
+ pingProbe.receiveWhile(messages = 20) {
+ case Pong => ()
+ }
+ }
+ }
+ }
+ enterBarrier("sharded actors started")
+ }
+
+ "get ShardIds of shards that timed out per region" in {
+ runOn(roles: _*) {
+ val probe = TestProbe()
+ val region = ClusterSharding(system).shardRegion(shardTypeName)
+ region.tell(ShardRegion.GetClusterShardingStats(10.seconds), probe.ref)
+ val regions = probe.expectMsgType[ShardRegion.ClusterShardingStats].regions
+ regions.size shouldEqual 3
+ val timeouts = numberOfShards / regions.size
+
+ // 3 regions, 2 shards per region, all 2 shards/region were unresponsive
+ // within shard-region-query-timeout = 0ms
+ regions.values.forall { s =>
+ s.stats.isEmpty && s.failed.size == timeouts
+ } shouldBe true
+
+ regions.values.map(_.failed.size).sum shouldEqual numberOfShards
+ enterBarrier("received stats")
+ }
+ enterBarrier("done")
+ }
+
+ }
+
+}
diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardingQueriesSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardingQueriesSpec.scala
new file mode 100644
index 0000000000..24f8144b36
--- /dev/null
+++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardingQueriesSpec.scala
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2019 Lightbend Inc.
+ */
+
+package akka.cluster.sharding
+
+import akka.cluster.sharding.Shard.ShardStats
+import akka.cluster.sharding.ShardRegion.ShardState
+import akka.cluster.sharding.ShardingQueries.ShardsQueryResult
+import akka.testkit.AkkaSpec
+
+class ShardingQueriesSpec extends AkkaSpec {
+
+ private val shards = Seq("a", "b", "busy")
+ private val timeouts = Set("busy")
+
+ "ShardsQueryResult" must {
+
+ "reflect nothing to acquire metadata from - 0 shards" in {
+ val qr = ShardsQueryResult[ShardState](Seq.empty, 0)
+ qr.total shouldEqual qr.queried
+ qr.isTotalFailed shouldBe false // you'd have to make > 0 attempts in order to fail
+ qr.isAllSubsetFailed shouldBe false // same
+ }
+
+ "partition failures and responses by type and by convention (failed Left T Right)" in {
+ val responses = Seq(ShardStats("a", 1), ShardStats("b", 1))
+ val results = responses.map(Right(_)) ++ timeouts.map(Left(_))
+ val qr = ShardsQueryResult[ShardStats](results, shards.size)
+ qr.failed shouldEqual timeouts
+ qr.responses shouldEqual responses
+ qr.isTotalFailed shouldBe false
+ qr.isAllSubsetFailed shouldBe false
+ }
+
+ "detect a subset query - not all queried" in {
+ val responses = Seq(ShardStats("a", 1), ShardStats("b", 1))
+ val results = responses.map(Right(_)) ++ timeouts.map(Left(_))
+ val qr = ShardsQueryResult[ShardStats](results, shards.size + 1)
+ qr.isAllSubsetFailed shouldBe false // is subset, not all failed
+ qr.total > qr.queried shouldBe true
+ qr.queried < shards.size
+ }
+
+ "partition when all failed" in {
+ val results = Seq(Left("c"), Left("d"))
+ val qr = ShardsQueryResult[ShardState](results, results.size)
+ qr.total shouldEqual qr.queried
+ qr.isTotalFailed shouldBe true
+ qr.isAllSubsetFailed shouldBe false // not a subset
+ }
+ }
+
+}
diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala
index 32d30e0171..073abf3ba4 100644
--- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala
+++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala
@@ -5,13 +5,16 @@
package akka.cluster.sharding.protobuf
import scala.concurrent.duration._
+
import akka.actor.Address
import akka.actor.ExtendedActorSystem
-import akka.testkit.AkkaSpec
import akka.actor.Props
+import akka.cluster.sharding.Shard
+import akka.cluster.sharding.ShardCoordinator
+import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.ShardId
-import akka.cluster.sharding.{ Shard, ShardCoordinator, ShardRegion }
import akka.serialization.SerializationExtension
+import akka.testkit.AkkaSpec
class ClusterShardingMessageSerializerSpec extends AkkaSpec {
import ShardCoordinator.Internal._
@@ -88,8 +91,8 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
}
"be able to serialize ShardRegionStats" in {
- checkSerialization(ShardRegion.ShardRegionStats(Map.empty[ShardId, Int]))
- checkSerialization(ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23)))
+ checkSerialization(ShardRegion.ShardRegionStats(Map.empty[ShardId, Int], Set.empty[ShardId]))
+ checkSerialization(ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23), Set("b")))
}
"be able to serialize StartEntity" in {
@@ -107,8 +110,8 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
checkSerialization(ShardRegion.GetClusterShardingStats(3.seconds))
checkSerialization(
ShardRegion.ClusterShardingStats(Map(
- Address("akka", "sys", "a", 2552) -> ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23)),
- Address("akka", "sys", "b", 2552) -> ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23)))))
+ Address("akka", "sys", "a", 2552) -> ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23), Set("b")),
+ Address("akka", "sys", "b", 2552) -> ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23), Set("b")))))
}
}
}
diff --git a/akka-docs/src/main/paradox/cluster-sharding.md b/akka-docs/src/main/paradox/cluster-sharding.md
index dd49c3f8bf..31f62e70c1 100644
--- a/akka-docs/src/main/paradox/cluster-sharding.md
+++ b/akka-docs/src/main/paradox/cluster-sharding.md
@@ -523,7 +523,9 @@ the identifiers of the shards running in a Region and what entities are alive fo
`ShardRegion.GetClusterShardingStats` which will query all the regions in the cluster and return
a `ShardRegion.ClusterShardingStats` containing the identifiers of the shards running in each region and a count
-of entities that are alive in each shard.
+of entities that are alive in each shard. If any shard queries failed, for example due to timeout
+if a shard was too busy to reply within the configured `akka.cluster.sharding.shard-region-query-timeout`,
+`ShardRegion.ClusterShardingStats` will also include the set of shard identifiers by region that failed.
The type names of all started shards can be acquired via @scala[`ClusterSharding.shardTypeNames`] @java[`ClusterSharding.getShardTypeNames`].
diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md
index cc1c14faba..30b31d5004 100644
--- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md
+++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md
@@ -364,6 +364,12 @@ akka.cluster.sharding.passivate-idle-entity-after = off
It is always disabled if @ref:[Remembering Entities](../cluster-sharding.md#remembering-entities) is enabled.
+#### Cluster Sharding stats
+
+A new field has been added to the response of a `ShardRegion.GetClusterShardingStats` command
+for any shards per region that may have failed or not responded within the new configurable `akka.cluster.sharding.shard-region-query-timeout`.
+This is described further in @ref:[inspecting sharding state](../cluster-sharding.md#inspecting-cluster-sharding-state).
+
### Distributed Data
Configuration properties for controlling sizes of `Gossip` and `DeltaPropagation` messages in Distributed Data