=con #15788 Harden ClusterShardingSpec

This commit is contained in:
Patrik Nordwall 2014-11-03 08:09:32 +01:00
parent f2f88d9dd7
commit ffe2992917

View file

@ -182,16 +182,21 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
def createCoordinator(): Unit = {
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds, rebalanceInterval = 2.seconds,
snapshotInterval = 3600.seconds, allocationStrategy)
def coordinatorProps(rebalanceEnabled: Boolean) =
ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds,
rebalanceInterval = if (rebalanceEnabled) 2.seconds else 3600.seconds,
snapshotInterval = 3600.seconds, allocationStrategy)
for (coordinatorName List("counter", "PersistentCounterEntries", "AnotherPersistentCounter", "PersistentCounter", "AutoMigrateRegionTest"))
system.actorOf(ClusterSingletonManager.props(
singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps),
singletonName = "singleton",
terminationMessage = PoisonPill,
role = None),
name = coordinatorName + "Coordinator")
List("counter", "rebalancingCounter", "PersistentCounterEntries", "AnotherPersistentCounter",
"PersistentCounter", "RebalancingPersistentCounter", "AutoMigrateRegionTest").foreach { coordinatorName
val rebalanceEnabled = coordinatorName.toLowerCase.startsWith("rebalancing")
system.actorOf(ClusterSingletonManager.props(
singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps(rebalanceEnabled)),
singletonName = "singleton",
terminationMessage = PoisonPill,
role = None),
name = coordinatorName + "Coordinator")
}
}
def createRegion(typeName: String, rememberEntries: Boolean): ActorRef = system.actorOf(ShardRegion.props(
@ -210,10 +215,12 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
name = typeName + "Region")
lazy val region = createRegion("counter", rememberEntries = false)
lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntries = false)
lazy val persistentEntriesRegion = createRegion("PersistentCounterEntries", rememberEntries = true)
lazy val anotherPersistentRegion = createRegion("AnotherPersistentCounter", rememberEntries = true)
lazy val persistentRegion = createRegion("PersistentCounter", rememberEntries = true)
lazy val rebalancingPersistentRegion = createRegion("RebalancingPersistentCounter", rememberEntries = true)
lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntries = true)
"Cluster sharding" must {
@ -350,6 +357,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
region ! EntryEnvelope(3, Increment)
region ! Get(3)
expectMsg(10)
lastSender.path should be(region.path / "3" / "3") // local
}
enterBarrier("third-update")
@ -358,6 +366,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
region ! EntryEnvelope(4, Increment)
region ! Get(4)
expectMsg(20)
lastSender.path should be(region.path / "4" / "4") // local
}
enterBarrier("fourth-update")
@ -423,34 +432,13 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
"rebalance to nodes with less shards" in within(60 seconds) {
runOn(fourth) {
// third, fourth and fifth are still alive
// shards 3 and 4 are already allocated
// make sure shards 1 and 2 (previously on crashed first) are allocated
awaitAssert {
val probe1 = TestProbe()
within(1.second) {
region.tell(Get(1), probe1.ref)
probe1.expectMsg(2)
}
}
awaitAssert {
val probe2 = TestProbe()
within(1.second) {
region.tell(Get(2), probe2.ref)
probe2.expectMsg(4)
}
}
// add more shards, which should later trigger rebalance to new node sixth
for (n 5 to 10)
region ! EntryEnvelope(n, Increment)
for (n 5 to 10) {
region ! Get(n)
for (n 1 to 10) {
rebalancingRegion ! EntryEnvelope(n, Increment)
rebalancingRegion ! Get(n)
expectMsg(1)
}
}
enterBarrier("more-added")
enterBarrier("rebalancing-shards-allocated")
join(sixth, third)
@ -460,9 +448,9 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
within(3.seconds) {
var count = 0
for (n 1 to 10) {
region.tell(Get(n), probe.ref)
rebalancingRegion.tell(Get(n), probe.ref)
probe.expectMsgType[Int]
if (probe.lastSender.path == region.path / (n % 12).toString / n.toString)
if (probe.lastSender.path == rebalancingRegion.path / (n % 12).toString / n.toString)
count += 1
}
count should be >= (2)
@ -724,6 +712,8 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
//Start only one region, and force an entry onto that region
runOn(third) {
autoMigrateRegion ! EntryEnvelope(1, Increment)
autoMigrateRegion ! Get(1)
expectMsg(1)
}
enterBarrier("shard1-region3")
@ -760,34 +750,34 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
"ensure rebalance restarts shards" in within(50.seconds) {
runOn(fourth) {
for (i 2 to 12) {
persistentRegion ! EntryEnvelope(i, Increment)
rebalancingPersistentRegion ! EntryEnvelope(i, Increment)
}
for (i 2 to 12) {
persistentRegion ! Get(i)
rebalancingPersistentRegion ! Get(i)
expectMsg(1)
}
}
enterBarrier("entries-started")
runOn(fifth) {
persistentRegion
rebalancingPersistentRegion
}
enterBarrier("fifth-joined-shard")
runOn(fifth) {
var count = 0
for (n 2 to 12) {
val entry = system.actorSelection(system / "PersistentCounterRegion" / (n % 12).toString / n.toString)
entry ! Identify(n)
receiveOne(3 seconds) match {
case ActorIdentity(id, Some(_)) if id == n count = count + 1
case ActorIdentity(id, None) //Not on the fifth shard
awaitAssert {
var count = 0
for (n 2 to 12) {
val entry = system.actorSelection(rebalancingPersistentRegion.path / (n % 12).toString / n.toString)
entry ! Identify(n)
receiveOne(3 seconds) match {
case ActorIdentity(id, Some(_)) if id == n count = count + 1
case ActorIdentity(id, None) //Not on the fifth shard
}
}
count should be >= (2)
}
assert(count >= 3, s"Not enough entries migrated, only ${count}")
}
enterBarrier("after-15")