harden ClusterShardingQueries with awaitAssert (#31157)
This commit is contained in:
parent
536bb2a3e6
commit
331db9b8f8
1 changed files with 23 additions and 20 deletions
|
|
@ -8,7 +8,6 @@ import scala.concurrent.duration._
|
|||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
|
||||
import akka.actor.Props
|
||||
import akka.testkit.TestProbe
|
||||
|
||||
|
|
@ -29,8 +28,7 @@ object ClusterShardingQueriesSpec {
|
|||
val shardTypeName = "DatatypeA"
|
||||
}
|
||||
|
||||
object ClusterShardingQueriesSpecConfig
|
||||
extends MultiNodeClusterShardingConfig(additionalConfig = """
|
||||
object ClusterShardingQueriesSpecConfig extends MultiNodeClusterShardingConfig(additionalConfig = s"""
|
||||
akka.log-dead-letters-during-shutdown = off
|
||||
akka.cluster.sharding {
|
||||
shard-region-query-timeout = 2ms
|
||||
|
|
@ -113,15 +111,17 @@ abstract class ClusterShardingQueriesSpec
|
|||
runOn(busy, second, third) {
|
||||
val probe = TestProbe()
|
||||
val region = ClusterSharding(system).shardRegion(shardTypeName)
|
||||
region.tell(ShardRegion.GetClusterShardingStats(10.seconds), probe.ref)
|
||||
val regions = probe.expectMsgType[ShardRegion.ClusterShardingStats].regions
|
||||
regions.size shouldEqual 3
|
||||
val timeouts = numberOfShards / regions.size
|
||||
awaitAssert({
|
||||
region.tell(ShardRegion.GetClusterShardingStats(10.seconds), probe.ref)
|
||||
val regions = probe.expectMsgType[ShardRegion.ClusterShardingStats].regions
|
||||
regions.size shouldEqual 3
|
||||
val timeouts = numberOfShards / regions.size
|
||||
|
||||
// 3 regions, 2 shards per region, all 2 shards/region were unresponsive
|
||||
// within shard-region-query-timeout, which only on first is 0ms
|
||||
regions.values.map(_.stats.size).sum shouldEqual 4
|
||||
regions.values.map(_.failed.size).sum shouldEqual timeouts
|
||||
// 3 regions, 2 shards per region, all 2 shards/region were unresponsive
|
||||
// within shard-region-query-timeout, which only on first is 0ms
|
||||
regions.values.map(_.stats.size).sum shouldEqual 4
|
||||
regions.values.map(_.failed.size).sum shouldEqual timeouts
|
||||
}, max = 10.seconds)
|
||||
}
|
||||
enterBarrier("received failed stats from timed out shards vs empty")
|
||||
}
|
||||
|
|
@ -130,21 +130,24 @@ abstract class ClusterShardingQueriesSpec
|
|||
runOn(busy) {
|
||||
val probe = TestProbe()
|
||||
val region = ClusterSharding(system).shardRegion(shardTypeName)
|
||||
region.tell(ShardRegion.GetShardRegionState, probe.ref)
|
||||
val state = probe.expectMsgType[ShardRegion.CurrentShardRegionState]
|
||||
state.shards.isEmpty shouldEqual true
|
||||
state.failed.size shouldEqual 2
|
||||
awaitAssert({
|
||||
region.tell(ShardRegion.GetShardRegionState, probe.ref)
|
||||
val state = probe.expectMsgType[ShardRegion.CurrentShardRegionState]
|
||||
state.shards.isEmpty shouldEqual true
|
||||
state.failed.size shouldEqual 2
|
||||
}, max = 10.seconds)
|
||||
}
|
||||
enterBarrier("query-timeout-on-busy-node")
|
||||
|
||||
runOn(second, third) {
|
||||
val probe = TestProbe()
|
||||
val region = ClusterSharding(system).shardRegion(shardTypeName)
|
||||
|
||||
region.tell(ShardRegion.GetShardRegionState, probe.ref)
|
||||
val state = probe.expectMsgType[ShardRegion.CurrentShardRegionState]
|
||||
state.shards.size shouldEqual 2
|
||||
state.failed.isEmpty shouldEqual true
|
||||
awaitAssert({
|
||||
region.tell(ShardRegion.GetShardRegionState, probe.ref)
|
||||
val state = probe.expectMsgType[ShardRegion.CurrentShardRegionState]
|
||||
state.shards.size shouldEqual 2
|
||||
state.failed.isEmpty shouldEqual true
|
||||
}, max = 10.seconds)
|
||||
}
|
||||
enterBarrier("done")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue