New base cluster sharding multi-node/jvm test to cleanup and tighten testing (#27740)

This commit is contained in:
Helena Edelson 2019-09-19 08:14:48 -07:00 committed by GitHub
parent e49edff4e8
commit 20238d975f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 192 additions and 1 deletions

View file

@ -68,7 +68,6 @@ abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String, val r
commonConfig( commonConfig(
modeConfig modeConfig
.withFallback(ConfigFactory.parseString(s""" .withFallback(ConfigFactory.parseString(s"""
akka.loglevel = DEBUG
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.cluster.auto-down-unreachable-after = 0s akka.cluster.auto-down-unreachable-after = 0s
akka.remote.log-remote-lifecycle-events = off akka.remote.log-remote-lifecycle-events = off

View file

@ -0,0 +1,58 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.cluster.MultiNodeClusterSpec
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.remote.testkit.MultiNodeConfig
import akka.testkit.AkkaSpec
import com.typesafe.config.{ Config, ConfigFactory }
/**
* A MultiNodeConfig for ClusterSharding. Implement the roles, etc. and create with the following:
*
* @param mode the state store mode
* @param rememberEntities defaults to off
* @param overrides additional config
* @param loglevel defaults to INFO
*/
abstract class MultiNodeClusterShardingConfig(
val mode: String = ClusterShardingSettings.StateStoreModeDData,
val rememberEntities: Boolean = false,
overrides: Config = ConfigFactory.empty,
loglevel: String = "INFO")
extends MultiNodeConfig {
val targetDir = s"target/ClusterSharding${AkkaSpec.getCallerName(getClass)}Spec-$mode-remember-$rememberEntities"
val modeConfig =
if (mode == ClusterShardingSettings.StateStoreModeDData) ConfigFactory.empty
else ConfigFactory.parseString(s"""
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared.timeout = 5s
akka.persistence.journal.leveldb-shared.store.native = off
akka.persistence.journal.leveldb-shared.store.dir = "$targetDir/journal"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "$targetDir/snapshots"
""")
commonConfig(
overrides
.withFallback(modeConfig)
.withFallback(ConfigFactory.parseString(s"""
akka.loglevel = $loglevel
akka.actor.provider = "cluster"
akka.cluster.auto-down-unreachable-after = 0s
akka.remote.log-remote-lifecycle-events = off
akka.cluster.sharding.state-store-mode = "$mode"
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = $targetDir/sharding-ddata
map-size = 10 MiB
}
"""))
.withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest)
.withFallback(MultiNodeClusterSpec.clusterConfig))
}

View file

@ -0,0 +1,134 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import java.io.File
import scala.concurrent.duration._
import akka.actor.{ Actor, ActorIdentity, ActorRef, ActorSystem, Identify, PoisonPill, Props }
import akka.cluster.{ Cluster, MultiNodeClusterSpec }
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.TestProbe
import akka.util.ccompat.ccompatUsedUntil213
import org.apache.commons.io.FileUtils
@ccompatUsedUntil213
object MultiNodeClusterShardingSpec {
final case class EntityStarted(ref: ActorRef)
def props(probe: ActorRef): Props = Props(new EntityActor(probe))
class EntityActor(probe: ActorRef) extends Actor {
probe ! EntityStarted(self)
def receive: Receive = {
case m => sender() ! m
}
}
val defaultExtractEntityId: ShardRegion.ExtractEntityId = {
case id: Int => (id.toString, id)
}
val defaultExtractShardId: ShardRegion.ExtractShardId = msg =>
msg match {
case id: Int => id.toString
case ShardRegion.StartEntity(id) => id
}
}
abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterShardingConfig)
extends MultiNodeSpec(config)
with MultiNodeClusterSpec {
import MultiNodeClusterShardingSpec._
import config._
override def initialParticipants: Int = roles.size
protected val storageLocations = List(
new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
override protected def atStartup(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
enterBarrier("startup")
super.atStartup()
}
override protected def afterTermination(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
super.afterTermination()
}
protected def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system).join(node(to).address)
awaitAssert {
Cluster(system).state.isMemberUp(node(from).address)
}
}
enterBarrier(from.name + "-joined")
}
protected def startSharding(
sys: ActorSystem,
entityProps: Props,
dataType: String,
extractEntityId: ShardRegion.ExtractEntityId = defaultExtractEntityId,
extractShardId: ShardRegion.ExtractShardId = defaultExtractShardId,
handOffStopMessage: Any = PoisonPill): ActorRef = {
ClusterSharding(sys).start(
typeName = dataType,
entityProps = entityProps,
settings = ClusterShardingSettings(sys).withRememberEntities(rememberEntities),
extractEntityId = extractEntityId,
extractShardId = extractShardId,
ClusterSharding(sys).defaultShardAllocationStrategy(ClusterShardingSettings(sys)),
handOffStopMessage)
}
protected def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
private def setStoreIfNotDdataMode(sys: ActorSystem, storeOn: RoleName): Unit =
if (!isDdataMode) {
val probe = TestProbe()(sys)
sys.actorSelection(node(storeOn) / "user" / "store").tell(Identify(None), probe.ref)
val sharedStore = probe.expectMsgType[ActorIdentity](20.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, sys)
}
/**
* {{{
* startPersistence(startOn = first, setStoreOn = Seq(first, second, third))
* }}}
*
* @param startOn the node to start the `SharedLeveldbStore` store on
* @param setStoreOn the nodes to `SharedLeveldbJournal.setStore` on
*/
protected def startPersistenceIfNotDdataMode(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit =
if (!isDdataMode) {
Persistence(system)
runOn(startOn) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("persistence-started")
runOn(setStoreOn: _*) {
setStoreIfNotDdataMode(system, startOn)
}
enterBarrier(s"after-${startOn.name}")
}
}