Stabilization of ClusterShardingGetStatsSpec, fix for #19863

This commit is contained in:
Johan Andrén 2016-02-26 14:54:18 +01:00
parent d63ba52da3
commit 854d5b0c09

View file

@ -4,7 +4,7 @@
package akka.cluster.sharding
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.{ MemberStatus, Cluster }
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
@ -19,7 +19,7 @@ object ClusterShardingGetStatsSpec {
case object Pong
class ShardedActor extends Actor with ActorLogging {
log.info(self.path.toString)
log.info(s"entity started {}", self.path)
def receive = {
case Stop context.stop(self)
case _: Ping sender() ! Pong
@ -49,13 +49,15 @@ object ClusterShardingGetStatsSpecConfig extends MultiNodeConfig {
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.log-dead-letters-during-shutdown = off
akka.cluster.metrics.enabled = off
akka.cluster.auto-down-unreachable-after = 0s
akka.cluster.sharding {
coordinator-failure-backoff = 3s
shard-failure-backoff = 3s
state-store-mode = "ddata"
updating-state-timeout = 2s
waiting-for-state-timeout = 2s
}
akka.actor.warn-about-java-serializer-usage=false
"""))
nodeConfig(first, second, third)(ConfigFactory.parseString(
@ -99,7 +101,7 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding
enterBarrier(from.name + "-joined")
}
var shardActor = Actor.noSender
lazy val region = ClusterSharding(system).shardRegion(shardTypeName)
"Inspecting cluster sharding state" must {
@ -109,17 +111,18 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding
join(second)
join(third)
// make sure all nodes has joined
awaitAssert {
Cluster(system).sendCurrentClusterState(testActor)
expectMsgType[CurrentClusterState].members.size === 3
// make sure all nodes are up
within(10.seconds) {
awaitAssert {
Cluster(system).state.members.count(_.status == MemberStatus.Up) should === (4)
}
}
runOn(controller) {
startProxy()
}
runOn(first, second, third) {
shardActor = startShard()
startShard()
}
enterBarrier("sharding started")
@ -130,12 +133,11 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding
within(10.seconds) {
awaitAssert {
val probe = TestProbe()
val region = ClusterSharding(system).shardRegion(shardTypeName)
region.tell(ShardRegion.GetClusterShardingStats(10.seconds.dilated), probe.ref)
val shardStats = probe.expectMsgType[ShardRegion.ClusterShardingStats]
shardStats.regions.size shouldEqual 3
shardStats.regions.values.map(_.stats.size).sum shouldEqual 0
shardStats.regions.keys.forall(_.hasGlobalScope) should be(true)
shardStats.regions.size should ===(3)
shardStats.regions.values.map(_.stats.size).sum should ===(0)
shardStats.regions.keys.forall(_.hasGlobalScope) should ===(true)
}
}
@ -144,23 +146,19 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding
"trigger sharded actors" in {
runOn(controller) {
val region = ClusterSharding(system).shardRegion(shardTypeName)
within(10.seconds) {
awaitAssert {
val pingProbe = TestProbe()
// trigger starting of 2 entities on first and second node
// but leave third node without entities
(1 to 6).filterNot(_ % 3 == 0).foreach(n region.tell(Ping(n), pingProbe.ref))
List(1, 2, 4, 6).foreach(n region.tell(Ping(n), pingProbe.ref))
pingProbe.receiveWhile(messages = 4) {
case Pong ()
}
}
}
}
enterBarrier("sharded actors started")
}
"get shard state" in {
@ -175,29 +173,32 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding
regions.keys.forall(_.hasGlobalScope) should be(true)
}
}
enterBarrier("got shard state")
system.log.info("got shard state")
}
"return stats after a node leaves" in {
runOn(first) {
runOn(controller) {
Cluster(system).leave(node(third).address)
}
runOn(third) {
watch(shardActor)
expectTerminated(shardActor, 15.seconds)
runOn(controller, first, second) {
within(30.seconds) {
awaitAssert {
Cluster(system).state.members.size should ===(3)
}
}
}
enterBarrier("third node removed")
system.log.info("third node removed")
runOn(first, second) {
runOn(controller) {
within(10.seconds) {
awaitAssert {
val pingProbe = TestProbe()
// trigger the same four shards
(1 to 6).filterNot(_ % 3 == 0).foreach(n shardActor.tell(Ping(n), pingProbe.ref))
// make sure we have the 4 entities still alive across the fewer nodes
List(1, 2, 4, 6).foreach(n region.tell(Ping(n), pingProbe.ref))
pingProbe.receiveWhile(messages = 4) {
case Pong ()
}
@ -207,15 +208,14 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding
enterBarrier("shards revived")
runOn(first, second) {
within(10.seconds) {
runOn(controller) {
within(20.seconds) {
awaitAssert {
val probe = TestProbe()
val region = ClusterSharding(system).shardRegion(shardTypeName)
region.tell(ShardRegion.GetClusterShardingStats(10.seconds.dilated), probe.ref)
region.tell(ShardRegion.GetClusterShardingStats(20.seconds.dilated), probe.ref)
val regions = probe.expectMsgType[ShardRegion.ClusterShardingStats].regions
regions.size shouldEqual 2
regions.values.flatMap(_.stats.values).sum shouldEqual 4
regions.size === 2
regions.values.flatMap(_.stats.values).sum should ===(4)
}
}
}