From 854d5b0c09b3d72c1441ea3000fa9f894bb58598 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andre=CC=81n?= Date: Fri, 26 Feb 2016 14:54:18 +0100 Subject: [PATCH] Stabilization of ClusterShardingGetStatsSpec, fix for #19863 --- .../ClusterShardingGetStatsSpec.scala | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala index ddcc6e8e65..036f10b41a 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGetStatsSpec.scala @@ -4,7 +4,7 @@ package akka.cluster.sharding import akka.actor._ -import akka.cluster.Cluster +import akka.cluster.{ MemberStatus, Cluster } import akka.cluster.ClusterEvent.CurrentClusterState import akka.remote.testconductor.RoleName import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } @@ -19,7 +19,7 @@ object ClusterShardingGetStatsSpec { case object Pong class ShardedActor extends Actor with ActorLogging { - log.info(self.path.toString) + log.info(s"entity started {}", self.path) def receive = { case Stop ⇒ context.stop(self) case _: Ping ⇒ sender() ! Pong @@ -49,13 +49,15 @@ object ClusterShardingGetStatsSpecConfig extends MultiNodeConfig { akka.loglevel = INFO akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off + akka.log-dead-letters-during-shutdown = off akka.cluster.metrics.enabled = off akka.cluster.auto-down-unreachable-after = 0s akka.cluster.sharding { - coordinator-failure-backoff = 3s - shard-failure-backoff = 3s state-store-mode = "ddata" + updating-state-timeout = 2s + waiting-for-state-timeout = 2s } + akka.actor.warn-about-java-serializer-usage=false """)) nodeConfig(first, second, third)(ConfigFactory.parseString( @@ -99,7 +101,7 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding enterBarrier(from.name + "-joined") } - var shardActor = Actor.noSender + lazy val region = ClusterSharding(system).shardRegion(shardTypeName) "Inspecting cluster sharding state" must { @@ -109,17 +111,18 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding join(second) join(third) - // make sure all nodes has joined - awaitAssert { - Cluster(system).sendCurrentClusterState(testActor) - expectMsgType[CurrentClusterState].members.size === 3 + // make sure all nodes are up + within(10.seconds) { + awaitAssert { + Cluster(system).state.members.count(_.status == MemberStatus.Up) should === (4) + } } runOn(controller) { startProxy() } runOn(first, second, third) { - shardActor = startShard() + startShard() } enterBarrier("sharding started") @@ -130,12 +133,11 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding within(10.seconds) { awaitAssert { val probe = TestProbe() - val region = ClusterSharding(system).shardRegion(shardTypeName) region.tell(ShardRegion.GetClusterShardingStats(10.seconds.dilated), probe.ref) val shardStats = probe.expectMsgType[ShardRegion.ClusterShardingStats] - shardStats.regions.size shouldEqual 3 - shardStats.regions.values.map(_.stats.size).sum shouldEqual 0 - shardStats.regions.keys.forall(_.hasGlobalScope) should be(true) + shardStats.regions.size should ===(3) + shardStats.regions.values.map(_.stats.size).sum should ===(0) + shardStats.regions.keys.forall(_.hasGlobalScope) should ===(true) } } @@ -144,23 +146,19 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding "trigger sharded actors" in { runOn(controller) { - val region = ClusterSharding(system).shardRegion(shardTypeName) - within(10.seconds) { awaitAssert { val pingProbe = TestProbe() // trigger starting of 2 entities on first and second node // but leave third node without entities - (1 to 6).filterNot(_ % 3 == 0).foreach(n ⇒ region.tell(Ping(n), pingProbe.ref)) + List(1, 2, 4, 6).foreach(n ⇒ region.tell(Ping(n), pingProbe.ref)) pingProbe.receiveWhile(messages = 4) { case Pong ⇒ () } } } } - enterBarrier("sharded actors started") - } "get shard state" in { @@ -175,29 +173,32 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding regions.keys.forall(_.hasGlobalScope) should be(true) } } - enterBarrier("got shard state") - + system.log.info("got shard state") } "return stats after a node leaves" in { - runOn(first) { + runOn(controller) { Cluster(system).leave(node(third).address) } - runOn(third) { - watch(shardActor) - expectTerminated(shardActor, 15.seconds) + runOn(controller, first, second) { + within(30.seconds) { + awaitAssert { + Cluster(system).state.members.size should ===(3) + } + } } enterBarrier("third node removed") + system.log.info("third node removed") - runOn(first, second) { + runOn(controller) { within(10.seconds) { awaitAssert { val pingProbe = TestProbe() - // trigger the same four shards - (1 to 6).filterNot(_ % 3 == 0).foreach(n ⇒ shardActor.tell(Ping(n), pingProbe.ref)) + // make sure we have the 4 entities still alive across the fewer nodes + List(1, 2, 4, 6).foreach(n ⇒ region.tell(Ping(n), pingProbe.ref)) pingProbe.receiveWhile(messages = 4) { case Pong ⇒ () } @@ -207,15 +208,14 @@ abstract class ClusterShardingGetStatsSpec extends MultiNodeSpec(ClusterSharding enterBarrier("shards revived") - runOn(first, second) { - within(10.seconds) { + runOn(controller) { + within(20.seconds) { awaitAssert { val probe = TestProbe() - val region = ClusterSharding(system).shardRegion(shardTypeName) - region.tell(ShardRegion.GetClusterShardingStats(10.seconds.dilated), probe.ref) + region.tell(ShardRegion.GetClusterShardingStats(20.seconds.dilated), probe.ref) val regions = probe.expectMsgType[ShardRegion.ClusterShardingStats].regions - regions.size shouldEqual 2 - regions.values.flatMap(_.stats.values).sum shouldEqual 4 + regions.size === 2 + regions.values.flatMap(_.stats.values).sum should ===(4) } } }