harden ClusterShardingRememberEntitiesSpecNewExtractorSpec
This commit is contained in:
parent
de51340ba2
commit
9835c08779
2 changed files with 39 additions and 33 deletions
|
|
@ -55,8 +55,6 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpecConfig(val mode: S
|
|||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
val fifth = role("fifth")
|
||||
|
||||
commonConfig(ConfigFactory.parseString(s"""
|
||||
akka.loglevel = DEBUG
|
||||
|
|
@ -102,8 +100,6 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpecConfig(val mode: S
|
|||
|
||||
nodeConfig(second)(roleConfig.withFallback(ddataNodeAConfig))
|
||||
nodeConfig(third)(roleConfig.withFallback(ddataNodeBConfig))
|
||||
nodeConfig(fourth)(roleConfig.withFallback(ddataNodeAConfig))
|
||||
nodeConfig(fifth)(roleConfig.withFallback(ddataNodeBConfig))
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -118,8 +114,6 @@ class PersistentClusterShardingRememberEntitiesNewExtractorSpec extends ClusterS
|
|||
class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode1 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode2 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode3 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode4 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode5 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
|
||||
|
||||
class DDataClusterShardingRememberEntitiesNewExtractorSpec extends ClusterShardingRememberEntitiesNewExtractorSpec(
|
||||
DDataClusterShardingRememberEntitiesNewExtractorSpecConfig)
|
||||
|
|
@ -127,8 +121,6 @@ class DDataClusterShardingRememberEntitiesNewExtractorSpec extends ClusterShardi
|
|||
class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode1 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode2 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode3 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode4 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode5 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
|
||||
|
||||
abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterShardingRememberEntitiesNewExtractorSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
|
||||
import ClusterShardingRememberEntitiesNewExtractorSpec._
|
||||
|
|
@ -168,16 +160,16 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterSh
|
|||
extractShardId = extractShardId1)
|
||||
}
|
||||
|
||||
def startShardingWithExtractor2(): Unit = {
|
||||
ClusterSharding(system).start(
|
||||
def startShardingWithExtractor2(sys: ActorSystem, probe: ActorRef): Unit = {
|
||||
ClusterSharding(sys).start(
|
||||
typeName = typeName,
|
||||
entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(Some(testActor)),
|
||||
entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(Some(probe)),
|
||||
settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"),
|
||||
extractEntityId = extractEntityId,
|
||||
extractShardId = extractShardId2)
|
||||
}
|
||||
|
||||
lazy val region = ClusterSharding(system).shardRegion(typeName)
|
||||
def region(sys: ActorSystem = system) = ClusterSharding(sys).shardRegion(typeName)
|
||||
|
||||
def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
|
||||
|
||||
|
|
@ -192,7 +184,7 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterSh
|
|||
}
|
||||
enterBarrier("persistence-started")
|
||||
|
||||
runOn(second, third, fourth, fifth) {
|
||||
runOn(second, third) {
|
||||
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
|
||||
val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get
|
||||
SharedLeveldbJournal.setStore(sharedStore, system)
|
||||
|
|
@ -222,7 +214,7 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterSh
|
|||
runOn(second, third) {
|
||||
// one entity for each shard id
|
||||
(1 to 10).foreach { n ⇒
|
||||
region ! n
|
||||
region() ! n
|
||||
expectMsg(n)
|
||||
}
|
||||
}
|
||||
|
|
@ -244,35 +236,40 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterSh
|
|||
enterBarrier("first-sharding-cluster-stopped")
|
||||
}
|
||||
|
||||
"start new nodes with different extractor" in within(15.seconds) {
|
||||
"start new nodes with different extractor, and have the entities running on the right shards" in within(30.seconds) {
|
||||
|
||||
// start it with a new shard id extractor, which will put the entities
|
||||
// on different shards
|
||||
|
||||
join(fourth, first)
|
||||
join(fifth, first)
|
||||
runOn(first) {
|
||||
within(remaining) {
|
||||
awaitAssert {
|
||||
cluster.state.members.count(_.status == MemberStatus.Up) should ===(3)
|
||||
}
|
||||
runOn(second, third) {
|
||||
watch(region())
|
||||
Cluster(system).leave(Cluster(system).selfAddress)
|
||||
expectTerminated(region())
|
||||
awaitAssert {
|
||||
Cluster(system).isTerminated should ===(true)
|
||||
}
|
||||
}
|
||||
runOn(fourth, fifth) {
|
||||
startShardingWithExtractor2()
|
||||
}
|
||||
enterBarrier("first-cluster-terminated")
|
||||
|
||||
// TODO how do we know that the shards has started??
|
||||
Thread.sleep(7000)
|
||||
enterBarrier("new-nodes-started")
|
||||
}
|
||||
// no sharding nodes left of the original cluster, start a new nodes
|
||||
runOn(second, third) {
|
||||
val sys2 = ActorSystem(system.name, system.settings.config)
|
||||
val probe2 = TestProbe()(sys2)
|
||||
|
||||
if (!isDdataMode) {
|
||||
sys2.actorSelection(node(first) / "user" / "store").tell(Identify(None), probe2.ref)
|
||||
val sharedStore = probe2.expectMsgType[ActorIdentity](10.seconds).ref.get
|
||||
SharedLeveldbJournal.setStore(sharedStore, sys2)
|
||||
}
|
||||
|
||||
Cluster(sys2).join(node(first).address)
|
||||
startShardingWithExtractor2(sys2, probe2.ref)
|
||||
probe2.expectMsgType[Started](20.seconds)
|
||||
|
||||
"have the remembered entities running on the right shards" in within(15.seconds) {
|
||||
runOn(fourth, fifth) {
|
||||
var stats: ShardRegion.CurrentShardRegionState = null
|
||||
within(remaining) {
|
||||
within(10.seconds) {
|
||||
awaitAssert {
|
||||
region ! ShardRegion.GetShardRegionState
|
||||
region(sys2) ! ShardRegion.GetShardRegionState
|
||||
val reply = expectMsgType[ShardRegion.CurrentShardRegionState]
|
||||
reply.shards should not be empty
|
||||
stats = reply
|
||||
|
|
@ -286,6 +283,13 @@ abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterSh
|
|||
val calculatedShardId = extractShardId2(entityId.toInt)
|
||||
calculatedShardId should ===(shardState.shardId)
|
||||
}
|
||||
|
||||
enterBarrier("verified")
|
||||
shutdown(sys2)
|
||||
}
|
||||
|
||||
runOn(first) {
|
||||
enterBarrier("verified")
|
||||
}
|
||||
|
||||
enterBarrier("done")
|
||||
|
|
|
|||
|
|
@ -225,6 +225,8 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb
|
|||
Cluster(sys2).join(Cluster(sys2).selfAddress)
|
||||
startSharding(sys2, probe2.ref)
|
||||
probe2.expectMsgType[Started](20.seconds)
|
||||
|
||||
shutdown(sys2)
|
||||
}
|
||||
enterBarrier("after-3")
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue