From ffe29929177e41fd3bcccd0f76ca0b19f6373c8c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 3 Nov 2014 08:09:32 +0100 Subject: [PATCH] =con #15788 Harden ClusterShardingSpec --- .../contrib/pattern/ClusterShardingSpec.scala | 88 ++++++++----------- 1 file changed, 39 insertions(+), 49 deletions(-) diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index 6667510810..0ca32ee6d9 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -182,16 +182,21 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult def createCoordinator(): Unit = { val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) - val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds, rebalanceInterval = 2.seconds, - snapshotInterval = 3600.seconds, allocationStrategy) + def coordinatorProps(rebalanceEnabled: Boolean) = + ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds, + rebalanceInterval = if (rebalanceEnabled) 2.seconds else 3600.seconds, + snapshotInterval = 3600.seconds, allocationStrategy) - for (coordinatorName ← List("counter", "PersistentCounterEntries", "AnotherPersistentCounter", "PersistentCounter", "AutoMigrateRegionTest")) - system.actorOf(ClusterSingletonManager.props( - singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps), - singletonName = "singleton", - terminationMessage = PoisonPill, - role = None), - name = coordinatorName + "Coordinator") + List("counter", "rebalancingCounter", "PersistentCounterEntries", "AnotherPersistentCounter", + "PersistentCounter", "RebalancingPersistentCounter", "AutoMigrateRegionTest").foreach { coordinatorName ⇒ + val rebalanceEnabled = coordinatorName.toLowerCase.startsWith("rebalancing") + system.actorOf(ClusterSingletonManager.props( + singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps(rebalanceEnabled)), + singletonName = "singleton", + terminationMessage = PoisonPill, + role = None), + name = coordinatorName + "Coordinator") + } } def createRegion(typeName: String, rememberEntries: Boolean): ActorRef = system.actorOf(ShardRegion.props( @@ -210,10 +215,12 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult name = typeName + "Region") lazy val region = createRegion("counter", rememberEntries = false) + lazy val rebalancingRegion = createRegion("rebalancingCounter", rememberEntries = false) lazy val persistentEntriesRegion = createRegion("PersistentCounterEntries", rememberEntries = true) lazy val anotherPersistentRegion = createRegion("AnotherPersistentCounter", rememberEntries = true) lazy val persistentRegion = createRegion("PersistentCounter", rememberEntries = true) + lazy val rebalancingPersistentRegion = createRegion("RebalancingPersistentCounter", rememberEntries = true) lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntries = true) "Cluster sharding" must { @@ -350,6 +357,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult region ! EntryEnvelope(3, Increment) region ! Get(3) expectMsg(10) + lastSender.path should be(region.path / "3" / "3") // local } enterBarrier("third-update") @@ -358,6 +366,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult region ! EntryEnvelope(4, Increment) region ! Get(4) expectMsg(20) + lastSender.path should be(region.path / "4" / "4") // local } enterBarrier("fourth-update") @@ -423,34 +432,13 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult "rebalance to nodes with less shards" in within(60 seconds) { runOn(fourth) { - // third, fourth and fifth are still alive - // shards 3 and 4 are already allocated - // make sure shards 1 and 2 (previously on crashed first) are allocated - awaitAssert { - val probe1 = TestProbe() - within(1.second) { - region.tell(Get(1), probe1.ref) - probe1.expectMsg(2) - } - } - awaitAssert { - val probe2 = TestProbe() - within(1.second) { - region.tell(Get(2), probe2.ref) - probe2.expectMsg(4) - } - } - - // add more shards, which should later trigger rebalance to new node sixth - for (n ← 5 to 10) - region ! EntryEnvelope(n, Increment) - - for (n ← 5 to 10) { - region ! Get(n) + for (n ← 1 to 10) { + rebalancingRegion ! EntryEnvelope(n, Increment) + rebalancingRegion ! Get(n) expectMsg(1) } } - enterBarrier("more-added") + enterBarrier("rebalancing-shards-allocated") join(sixth, third) @@ -460,9 +448,9 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult within(3.seconds) { var count = 0 for (n ← 1 to 10) { - region.tell(Get(n), probe.ref) + rebalancingRegion.tell(Get(n), probe.ref) probe.expectMsgType[Int] - if (probe.lastSender.path == region.path / (n % 12).toString / n.toString) + if (probe.lastSender.path == rebalancingRegion.path / (n % 12).toString / n.toString) count += 1 } count should be >= (2) @@ -724,6 +712,8 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult //Start only one region, and force an entry onto that region runOn(third) { autoMigrateRegion ! EntryEnvelope(1, Increment) + autoMigrateRegion ! Get(1) + expectMsg(1) } enterBarrier("shard1-region3") @@ -760,34 +750,34 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult "ensure rebalance restarts shards" in within(50.seconds) { runOn(fourth) { for (i ← 2 to 12) { - persistentRegion ! EntryEnvelope(i, Increment) + rebalancingPersistentRegion ! EntryEnvelope(i, Increment) } for (i ← 2 to 12) { - persistentRegion ! Get(i) + rebalancingPersistentRegion ! Get(i) expectMsg(1) } } enterBarrier("entries-started") runOn(fifth) { - persistentRegion + rebalancingPersistentRegion } enterBarrier("fifth-joined-shard") runOn(fifth) { - var count = 0 - - for (n ← 2 to 12) { - val entry = system.actorSelection(system / "PersistentCounterRegion" / (n % 12).toString / n.toString) - entry ! Identify(n) - receiveOne(3 seconds) match { - case ActorIdentity(id, Some(_)) if id == n ⇒ count = count + 1 - case ActorIdentity(id, None) ⇒ //Not on the fifth shard + awaitAssert { + var count = 0 + for (n ← 2 to 12) { + val entry = system.actorSelection(rebalancingPersistentRegion.path / (n % 12).toString / n.toString) + entry ! Identify(n) + receiveOne(3 seconds) match { + case ActorIdentity(id, Some(_)) if id == n ⇒ count = count + 1 + case ActorIdentity(id, None) ⇒ //Not on the fifth shard + } } + count should be >= (2) } - - assert(count >= 3, s"Not enough entries migrated, only ${count}") } enterBarrier("after-15")