Harden MultiDcClusterShardingSpec (#25201)

- Use global multi node cluster config
- Reduce retry interval for ShardRegion register
- Add clue to unhelpful assert failing
This commit is contained in:
Christopher Batey 2018-06-15 14:28:04 +01:00 committed by Johan Andrén
parent ee421b48bc
commit 28b86379c8
2 changed files with 22 additions and 22 deletions

View file

@ -5,13 +5,11 @@
package akka.cluster.sharding package akka.cluster.sharding
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.Address import akka.actor.Address
import akka.actor.Props import akka.actor.Props
import akka.cluster.Cluster import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec }
import akka.cluster.MemberStatus
import akka.cluster.sharding.ShardRegion.CurrentRegions import akka.cluster.sharding.ShardRegion.CurrentRegions
import akka.cluster.sharding.ShardRegion.GetCurrentRegions import akka.cluster.sharding.ShardRegion.GetCurrentRegions
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
@ -56,12 +54,16 @@ object MultiDcClusterShardingSpecConfig extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString(s""" commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = DEBUG # issue #23741 akka.loglevel = DEBUG # issue #23741
akka.cluster.debug.verbose-heartbeat-logging = on akka.cluster {
akka.cluster.debug.verbose-gossip-logging = on debug.verbose-heartbeat-logging = on
akka.actor.provider = "cluster" debug.verbose-gossip-logging = on
auto-down-unreachable-after = 0s
sharding {
retry-interval = 200ms
}
}
akka.remote.log-remote-lifecycle-events = on akka.remote.log-remote-lifecycle-events = on
akka.cluster.auto-down-unreachable-after = 0s """).withFallback(MultiNodeClusterSpec.clusterConfig))
"""))
nodeConfig(first, second) { nodeConfig(first, second) {
ConfigFactory.parseString("akka.cluster.multi-data-center.self-data-center = DC1") ConfigFactory.parseString("akka.cluster.multi-data-center.self-data-center = DC1")
@ -72,20 +74,16 @@ object MultiDcClusterShardingSpecConfig extends MultiNodeConfig {
} }
} }
class MultiDcClusterShardingMultiJvmNode1 extends MultiDcClusterShardingSpec class MultiDcClusterShardingSpecMultiJvmNode1 extends MultiDcClusterShardingSpec
class MultiDcClusterShardingMultiJvmNode2 extends MultiDcClusterShardingSpec class MultiDcClusterShardingSpecMultiJvmNode2 extends MultiDcClusterShardingSpec
class MultiDcClusterShardingMultiJvmNode3 extends MultiDcClusterShardingSpec class MultiDcClusterShardingSpecMultiJvmNode3 extends MultiDcClusterShardingSpec
class MultiDcClusterShardingMultiJvmNode4 extends MultiDcClusterShardingSpec class MultiDcClusterShardingSpecMultiJvmNode4 extends MultiDcClusterShardingSpec
abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterShardingSpecConfig) abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterShardingSpecConfig) with MultiNodeClusterSpec
with STMultiNodeSpec with ImplicitSender { with STMultiNodeSpec with ImplicitSender {
import MultiDcClusterShardingSpec._ import MultiDcClusterShardingSpec._
import MultiDcClusterShardingSpecConfig._ import MultiDcClusterShardingSpecConfig._
override def initialParticipants = roles.size
val cluster = Cluster(system)
def join(from: RoleName, to: RoleName): Unit = { def join(from: RoleName, to: RoleName): Unit = {
runOn(from) { runOn(from) {
cluster join node(to).address cluster join node(to).address
@ -131,8 +129,10 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
join(fourth, first) join(fourth, first)
awaitAssert({ awaitAssert({
withClue(s"Members: ${Cluster(system).state}") {
Cluster(system).state.members.size should ===(4) Cluster(system).state.members.size should ===(4)
Cluster(system).state.members.map(_.status) should ===(Set(MemberStatus.Up)) Cluster(system).state.members.map(_.status) should ===(Set(MemberStatus.Up))
}
}, 10.seconds) }, 10.seconds)
runOn(first, second) { runOn(first, second) {

View file

@ -95,7 +95,7 @@ object MultiNodeClusterSpec {
trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner with FlightRecordingSupport { self: MultiNodeSpec trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner with FlightRecordingSupport { self: MultiNodeSpec
override def initialParticipants = roles.size final override def initialParticipants = roles.size
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address] private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]
@ -240,14 +240,14 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
def memberInState(member: Address, status: Seq[MemberStatus]): Boolean = def memberInState(member: Address, status: Seq[MemberStatus]): Boolean =
clusterView.members.exists { m (m.address == member) && status.contains(m.status) } clusterView.members.exists { m (m.address == member) && status.contains(m.status) }
cluster join joinNode cluster.join(joinNode)
awaitCond({ awaitCond({
clusterView.refreshCurrentState() clusterView.refreshCurrentState()
if (memberInState(joinNode, List(MemberStatus.up)) && if (memberInState(joinNode, List(MemberStatus.up)) &&
memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up))) memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
true true
else { else {
cluster join joinNode cluster.join(joinNode)
false false
} }
}, max, interval) }, max, interval)