=con #15699 Fix race in Cluster Sharding tests

This commit is contained in:
Dominic Black 2014-08-21 16:39:24 +01:00
parent 296f5a7cab
commit d4047a2e1f
2 changed files with 43 additions and 61 deletions

View file

@ -659,7 +659,7 @@ class ShardRegion(
var shardsByRef = Map.empty[ActorRef, ShardId] var shardsByRef = Map.empty[ActorRef, ShardId]
var handingOff = Set.empty[ActorRef] var handingOff = Set.empty[ActorRef]
def totalBufferSize = shardBuffers.map { case (_, buf) buf.size }.sum def totalBufferSize = shardBuffers.foldLeft(0) { (sum, entry) sum + entry._2.size }
import context.dispatcher import context.dispatcher
val retryTask = context.system.scheduler.schedule(retryInterval, retryInterval, self, Retry) val retryTask = context.system.scheduler.schedule(retryInterval, retryInterval, self, Retry)
@ -786,7 +786,7 @@ class ShardRegion(
shardBuffers -= shard shardBuffers -= shard
if (shards.contains(shard)) { if (shards.contains(shard)) {
handingOff = handingOff + shards(shard) handingOff += shards(shard)
shards(shard) forward msg shards(shard) forward msg
} else } else
sender() ! ShardStopped(shard) sender() ! ShardStopped(shard)
@ -1009,7 +1009,7 @@ private[akka] class Shard(
var handOffStopper: Option[ActorRef] = None var handOffStopper: Option[ActorRef] = None
def totalBufferSize = messageBuffers.map { case (_, buf) buf.size }.sum def totalBufferSize = messageBuffers.foldLeft(0) { (sum, entry) sum + entry._2.size }
def processChange[A](event: A)(handler: A Unit): Unit = def processChange[A](event: A)(handler: A Unit): Unit =
if (rememberEntries) persist(event)(handler) if (rememberEntries) persist(event)(handler)

View file

@ -9,10 +9,7 @@ import akka.contrib.pattern.ShardRegion.Passivate
import language.postfixOps import language.postfixOps
import scala.concurrent.duration._ import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.ActorIdentity import akka.actor._
import akka.actor.Identify
import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.persistence.PersistentActor import akka.persistence.PersistentActor
import akka.persistence.Persistence import akka.persistence.Persistence
@ -26,8 +23,6 @@ import akka.testkit._
import akka.testkit.TestEvent.Mute import akka.testkit.TestEvent.Mute
import java.io.File import java.io.File
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import akka.actor.ReceiveTimeout
import akka.actor.ActorRef
object ClusterShardingSpec extends MultiNodeConfig { object ClusterShardingSpec extends MultiNodeConfig {
val controller = role("controller") val controller = role("controller")
@ -169,35 +164,37 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds, rebalanceInterval = 2.seconds, val coordinatorProps = ShardCoordinator.props(handOffTimeout = 10.seconds, shardStartTimeout = 10.seconds, rebalanceInterval = 2.seconds,
snapshotInterval = 3600.seconds, allocationStrategy) snapshotInterval = 3600.seconds, allocationStrategy)
for (coordinatorName List("counter", "PersistentCounterEntries", "AnotherPersistentCounter", "PersistentCounter", "AutoMigrateRegionTest"))
system.actorOf(ClusterSingletonManager.props( system.actorOf(ClusterSingletonManager.props(
singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps), singletonProps = ShardCoordinatorSupervisor.props(failureBackoff = 5.seconds, coordinatorProps),
singletonName = "singleton", singletonName = "singleton",
terminationMessage = PoisonPill, terminationMessage = PoisonPill,
role = None), role = None),
name = "counterCoordinator") name = coordinatorName + "Coordinator")
} }
lazy val region = system.actorOf(ShardRegion.props( def createRegion(typeName: String, rememberEntries: Boolean): ActorRef = system.actorOf(ShardRegion.props(
typeName = "counter", typeName = typeName,
entryProps = Props[Counter], entryProps = Props[Counter],
role = None, role = None,
coordinatorPath = "/user/counterCoordinator/singleton/coordinator", coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator",
retryInterval = 1.second, retryInterval = 1.second,
shardFailureBackoff = 1.second, shardFailureBackoff = 1.second,
entryRestartBackoff = 1.second, entryRestartBackoff = 1.second,
snapshotInterval = 1.hour, snapshotInterval = 1.hour,
bufferSize = 1000, bufferSize = 1000,
rememberEntries = false, rememberEntries = rememberEntries,
idExtractor = idExtractor, idExtractor = idExtractor,
shardResolver = shardResolver), shardResolver = shardResolver),
name = "counterRegion") name = typeName + "Region")
lazy val persistentRegion = ClusterSharding(system).start( lazy val region = createRegion("counter", rememberEntries = false)
typeName = "PersistentCounter",
entryProps = Some(Props[Counter]), lazy val persistentEntriesRegion = createRegion("PersistentCounterEntries", rememberEntries = true)
rememberEntries = true, lazy val anotherPersistentRegion = createRegion("AnotherPersistentCounter", rememberEntries = true)
idExtractor = idExtractor, lazy val persistentRegion = createRegion("PersistentCounter", rememberEntries = true)
shardResolver = shardResolver) lazy val autoMigrateRegion = createRegion("AutoMigrateRegionTest", rememberEntries = true)
"Cluster sharding" must { "Cluster sharding" must {
@ -547,28 +544,15 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
"Persistent Cluster Shards" must { "Persistent Cluster Shards" must {
"recover entries upon restart" in within(50.seconds) { "recover entries upon restart" in within(50.seconds) {
runOn(third, fourth, fifth) { runOn(third, fourth, fifth) {
ClusterSharding(system).start( persistentEntriesRegion
typeName = "PersistentCounterEntries", anotherPersistentRegion
entryProps = Some(Props[Counter]),
rememberEntries = true,
idExtractor = idExtractor,
shardResolver = shardResolver)
ClusterSharding(system).start(
typeName = "AnotherPersistentCounter",
entryProps = Some(Props[Counter]),
rememberEntries = true,
idExtractor = idExtractor,
shardResolver = shardResolver)
} }
enterBarrier("persistent-started") enterBarrier("persistent-started")
runOn(third) { runOn(third) {
val counterRegion: ActorRef = ClusterSharding(system).shardRegion("PersistentCounterEntries")
//Create an increment counter 1 //Create an increment counter 1
counterRegion ! EntryEnvelope(1, Increment) persistentEntriesRegion ! EntryEnvelope(1, Increment)
counterRegion ! Get(1) persistentEntriesRegion ! Get(1)
expectMsg(1) expectMsg(1)
//Shut down the shard and confirm it's dead //Shut down the shard and confirm it's dead
@ -585,7 +569,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
}, 5 seconds, 500 millis) }, 5 seconds, 500 millis)
//Get the path to where the shard now resides //Get the path to where the shard now resides
counterRegion ! Get(13) persistentEntriesRegion ! Get(13)
expectMsg(0) expectMsg(0)
//Check that counter 1 is now alive again, even though we have //Check that counter 1 is now alive again, even though we have
@ -605,11 +589,10 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
runOn(fourth) { runOn(fourth) {
//Check a second region does not share the same persistent shards //Check a second region does not share the same persistent shards
val anotherRegion: ActorRef = ClusterSharding(system).shardRegion("AnotherPersistentCounter")
//Create a separate 13 counter //Create a separate 13 counter
anotherRegion ! EntryEnvelope(13, Increment) anotherPersistentRegion ! EntryEnvelope(13, Increment)
anotherRegion ! Get(13) anotherPersistentRegion ! Get(13)
expectMsg(1) expectMsg(1)
//Check that no counter "1" exists in this shard //Check that no counter "1" exists in this shard
@ -624,7 +607,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
enterBarrier("after-11") enterBarrier("after-11")
} }
"permanently stop entries which passivate" in within(50.seconds) { "permanently stop entries which passivate" in within(15.seconds) {
runOn(third, fourth, fifth) { runOn(third, fourth, fifth) {
persistentRegion persistentRegion
} }
@ -650,7 +633,12 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
counter1.path.parent should be(counter13.path.parent) counter1.path.parent should be(counter13.path.parent)
//Send the shard the passivate message from the counter //Send the shard the passivate message from the counter
watch(counter1)
shard.tell(Passivate(Stop), counter1) shard.tell(Passivate(Stop), counter1)
//Watch for the terminated message
expectTerminated(counter1, 5 seconds)
awaitAssert({ awaitAssert({
//Check counter 1 is dead //Check counter 1 is dead
counter1 ! Identify(1) counter1 ! Identify(1)
@ -719,27 +707,21 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
enterBarrier("after-13") enterBarrier("after-13")
} }
"be migrated to new regions upon region failure" in within(50.seconds) { "be migrated to new regions upon region failure" in within(15.seconds) {
lazy val migrationRegion: ActorRef = ClusterSharding(system).start(
typeName = "AutoMigrateRegionTest",
entryProps = Some(Props[Counter]),
rememberEntries = true,
idExtractor = idExtractor,
shardResolver = shardResolver)
//Start only one region, and force an entry onto that region //Start only one region, and force an entry onto that region
runOn(third) { runOn(third) {
migrationRegion ! EntryEnvelope(1, Increment) autoMigrateRegion ! EntryEnvelope(1, Increment)
} }
enterBarrier("shard1-region3") enterBarrier("shard1-region3")
//Start another region and test it talks to node 3 //Start another region and test it talks to node 3
runOn(fourth) { runOn(fourth) {
migrationRegion ! EntryEnvelope(1, Increment) autoMigrateRegion ! EntryEnvelope(1, Increment)
migrationRegion ! Get(1) autoMigrateRegion ! Get(1)
expectMsg(2) expectMsg(2)
lastSender.path should be(node(third) / "user" / "sharding" / "AutoMigrateRegionTest" / "1" / "1") lastSender.path should be(node(third) / "user" / "AutoMigrateRegionTestRegion" / "1" / "1")
//Kill region 3 //Kill region 3
system.actorSelection(lastSender.path.parent.parent) ! PoisonPill system.actorSelection(lastSender.path.parent.parent) ! PoisonPill
@ -751,7 +733,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
//Test the shard, thus counter was moved onto node 4 and started. //Test the shard, thus counter was moved onto node 4 and started.
runOn(fourth) { runOn(fourth) {
val counter1 = system.actorSelection(system / "sharding" / "AutoMigrateRegionTest" / "1" / "1") val counter1 = system.actorSelection(system / "AutoMigrateRegionTestRegion" / "1" / "1")
counter1 ! Identify(1) counter1 ! Identify(1)
receiveOne(1 second) match { receiveOne(1 second) match {
case ActorIdentity(1, location) case ActorIdentity(1, location)
@ -787,7 +769,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
var count = 0 var count = 0
for (n 2 to 12) { for (n 2 to 12) {
var entry = system.actorSelection(system / "sharding" / "PersistentCounter" / (n % 12).toString / n.toString) val entry = system.actorSelection(system / "PersistentCounterRegion" / (n % 12).toString / n.toString)
entry ! Identify(n) entry ! Identify(n)
receiveOne(1 second) match { receiveOne(1 second) match {
case ActorIdentity(id, Some(_)) if id == n count = count + 1 case ActorIdentity(id, Some(_)) if id == n count = count + 1