From a601f36b203982cd42c032f6d6263268ce2bc9ec Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Wed, 17 Nov 2021 01:00:05 +1300 Subject: [PATCH] Add access pattern simulator for testing entity passivation strategies (#30884) * with licence comment for ycsb --- .../test/resources/arc-trace-database.conf | 98 ++++++++ .../src/test/resources/arc-trace-search.conf | 82 +++++++ .../src/test/resources/reference.conf | 46 ++++ .../src/test/resources/synthetic-zipfian.conf | 51 +++++ .../passivation/simulator/AccessPattern.scala | 104 +++++++++ .../passivation/simulator/Simulator.scala | 213 ++++++++++++++++++ .../simulator/SimulatorSettings.scala | 123 ++++++++++ .../simulator/SimulatorStats.scala | 140 ++++++++++++ project/AkkaDisciplinePlugin.scala | 3 +- project/Dependencies.scala | 6 +- 10 files changed, 864 insertions(+), 2 deletions(-) create mode 100644 akka-cluster-sharding/src/test/resources/arc-trace-database.conf create mode 100644 akka-cluster-sharding/src/test/resources/arc-trace-search.conf create mode 100644 akka-cluster-sharding/src/test/resources/reference.conf create mode 100644 akka-cluster-sharding/src/test/resources/synthetic-zipfian.conf create mode 100644 akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/AccessPattern.scala create mode 100644 akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/Simulator.scala create mode 100644 akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorSettings.scala create mode 100644 akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorStats.scala diff --git a/akka-cluster-sharding/src/test/resources/arc-trace-database.conf b/akka-cluster-sharding/src/test/resources/arc-trace-database.conf new file mode 100644 index 0000000000..6f19c316d4 --- /dev/null +++ b/akka-cluster-sharding/src/test/resources/arc-trace-database.conf @@ -0,0 +1,98 @@ +# +# Run the "database" trace (DS1) from the authors of the ARC algorithm. +# +# Nimrod Megiddo and Dharmendra S. Modha, "ARC: A Self-Tuning, Low Overhead Replacement Cache". +# +# Download traces from the author's home page: +# https://researcher.watson.ibm.com/researcher/view_person_subpage.php?id=4700 +# +# > akka-cluster-sharding/Test/runMain akka.cluster.sharding.passivation.simulator.Simulator arc-trace-database +# +# ╔════════╤═════════╤════════════╤═════════════╤══════════════╗ +# ║ Run │ Active │ Accesses │ Activations │ Passivations ║ +# ╠════════╪═════════╪════════════╪═════════════╪══════════════╣ +# ║ LRU 1M │ 3.09 % │ 43,704,979 │ 42,356,500 │ 41,356,500 ║ +# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ LRU 2M │ 10.75 % │ 43,704,979 │ 39,007,141 │ 37,007,141 ║ +# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ LRU 4M │ 20.24 % │ 43,704,979 │ 34,857,131 │ 30,857,131 ║ +# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ LRU 8M │ 43.03 % │ 43,704,979 │ 24,896,638 │ 16,896,638 ║ +# ╚════════╧═════════╧════════════╧═════════════╧══════════════╝ +# + +arc-traces="arc-traces" +arc-traces=${?ARC_TRACES} + +akka.cluster.sharding { + passivation.simulator { + runs = [ + { + name = "LRU 1M" + shards = 100 + regions = 10 + pattern = arc-database + strategy = lru-100k + }, + { + name = "LRU 2M" + shards = 100 + regions = 10 + pattern = arc-database + strategy = lru-200k + }, + { + name = "LRU 4M" + shards = 100 + regions = 10 + pattern = arc-database + strategy = lru-400k + }, + { + name = "LRU 8M" + shards = 100 + regions = 10 + pattern = arc-database + strategy = lru-800k + } + ] + + print-detailed-stats = true + + arc-database { + pattern = trace + trace { + format = arc + path = ${arc-traces}"/DS1.lis" + } + } + + lru-100k { + strategy = least-recently-used + least-recently-used { + per-region-limit = 100000 + } + } + + lru-200k { + strategy = least-recently-used + least-recently-used { + per-region-limit = 200000 + } + } + + lru-400k { + strategy = least-recently-used + least-recently-used { + per-region-limit = 400000 + } + } + + lru-800k { + strategy = least-recently-used + least-recently-used { + per-region-limit = 800000 + } + } + } +} diff --git a/akka-cluster-sharding/src/test/resources/arc-trace-search.conf b/akka-cluster-sharding/src/test/resources/arc-trace-search.conf new file mode 100644 index 0000000000..e6e8868529 --- /dev/null +++ b/akka-cluster-sharding/src/test/resources/arc-trace-search.conf @@ -0,0 +1,82 @@ +# +# Run the merged "search" trace (MergeS) from the authors of the ARC algorithm. +# +# Nimrod Megiddo and Dharmendra S. Modha, "ARC: A Self-Tuning, Low Overhead Replacement Cache". +# +# Download traces from the author's home page: +# https://researcher.watson.ibm.com/researcher/view_person_subpage.php?id=4700 +# +# > akka-cluster-sharding/Test/runMain akka.cluster.sharding.passivation.simulator.Simulator arc-trace-search +# +# ╔══════════╤═════════╤════════════╤═════════════╤══════════════╗ +# ║ Run │ Active │ Accesses │ Activations │ Passivations ║ +# ╠══════════╪═════════╪════════════╪═════════════╪══════════════╣ +# ║ LRU 250k │ 3.07 % │ 37,656,092 │ 36,498,516 │ 36,248,516 ║ +# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ LRU 500k │ 7.53 % │ 37,656,092 │ 34,822,122 │ 34,322,122 ║ +# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ LRU 1M │ 25.37 % │ 37,656,092 │ 28,102,287 │ 27,102,287 ║ +# ╚══════════╧═════════╧════════════╧═════════════╧══════════════╝ +# + +arc-traces="arc-traces" +arc-traces=${?ARC_TRACES} + +akka.cluster.sharding { + passivation.simulator { + runs = [ + { + name = "LRU 250k" + shards = 100 + regions = 10 + pattern = arc-search-merged + strategy = lru-25k + }, + { + name = "LRU 500k" + shards = 100 + regions = 10 + pattern = arc-search-merged + strategy = lru-50k + }, + { + name = "LRU 1M" + shards = 100 + regions = 10 + pattern = arc-search-merged + strategy = lru-100k + } + ] + + print-detailed-stats = true + + arc-search-merged { + pattern = trace + trace { + format = arc + path = ${arc-traces}"/MergeS.lis" + } + } + + lru-25k { + strategy = least-recently-used + least-recently-used { + per-region-limit = 25000 + } + } + + lru-50k { + strategy = least-recently-used + least-recently-used { + per-region-limit = 50000 + } + } + + lru-100k { + strategy = least-recently-used + least-recently-used { + per-region-limit = 100000 + } + } + } +} diff --git a/akka-cluster-sharding/src/test/resources/reference.conf b/akka-cluster-sharding/src/test/resources/reference.conf new file mode 100644 index 0000000000..73f56a668d --- /dev/null +++ b/akka-cluster-sharding/src/test/resources/reference.conf @@ -0,0 +1,46 @@ +akka.cluster.sharding { + passivation.simulator { + runs = [] + + print-detailed-stats = false + + run-defaults { + shards = 100 + regions = 10 + } + + strategy-defaults { + strategy = least-recently-used + least-recently-used { + per-region-limit = 100000 + } + } + + pattern-defaults { + synthetic { + sequence { + start = 1 + } + uniform { + min = 1 + max = 10000000 + } + exponential { + mean = 100 + } + hotspot { + min = 1 + max = 10000000 + hot = 0.1 + rate = 0.3 + } + zipfian { + min = 1 + max = 10000000 + constant = 0.99 + scrambled = true + } + } + } + } +} diff --git a/akka-cluster-sharding/src/test/resources/synthetic-zipfian.conf b/akka-cluster-sharding/src/test/resources/synthetic-zipfian.conf new file mode 100644 index 0000000000..1de3545b23 --- /dev/null +++ b/akka-cluster-sharding/src/test/resources/synthetic-zipfian.conf @@ -0,0 +1,51 @@ +# +# Run with synthetically generated access events with a scrambled zipfian distribution. +# +# > akka-cluster-sharding/Test/runMain akka.cluster.sharding.passivation.simulator.Simulator synthetic-zipfian +# +# ╔══════════╤═════════╤════════════╤═════════════╤══════════════╗ +# ║ Run │ Active │ Accesses │ Activations │ Passivations ║ +# ╠══════════╪═════════╪════════════╪═════════════╪══════════════╣ +# ║ LRU 100k │ 40.47 % │ 50,000,000 │ 29,764,380 │ 29,664,380 ║ +# ╚══════════╧═════════╧════════════╧═════════════╧══════════════╝ +# + +akka.cluster.sharding { + passivation.simulator { + runs = [ + { + name = "LRU 100k" + shards = 100 + regions = 10 + pattern = scrambled-zipfian + strategy = lru-10k + } + ] + + print-detailed-stats = true + + # scrambled zipfian distribution + # generate 50M events over 10M ids + scrambled-zipfian { + pattern = synthetic + synthetic { + events = 50000000 + generator = zipfian + zipfian { + min = 1 + max = 10000000 + scrambled = true + } + } + } + + # LRU strategy with 10k limit in each of 10 regions + # total limit across cluster of 100k (1% of id space) + lru-10k { + strategy = least-recently-used + least-recently-used { + per-region-limit = 10000 + } + } + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/AccessPattern.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/AccessPattern.scala new file mode 100644 index 0000000000..361503a2c1 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/AccessPattern.scala @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.cluster.sharding.passivation.simulator + +import akka.NotUsed +import akka.cluster.sharding.ShardRegion.EntityId +import akka.stream.scaladsl._ +import akka.util.ByteString + +import java.nio.file.Paths + +trait AccessPattern { + def entityIds: Source[EntityId, NotUsed] +} + +abstract class SyntheticGenerator(events: Int) extends AccessPattern { + protected def createGenerator(): site.ycsb.generator.NumberGenerator + + override def entityIds: Source[EntityId, NotUsed] = Source.unfold(createGenerator() -> 0) { + case (_, count) if count >= events => None + case (generator, count) => Option(generator.nextValue().longValue.toString).map(generator -> (count + 1) -> _) + } +} + +object SyntheticGenerator { + import site.ycsb.generator._ + + /** + * Generate a sequence of unique id events. + */ + final class Sequence(start: Long, events: Int) extends SyntheticGenerator(events) { + override protected def createGenerator(): NumberGenerator = new CounterGenerator(start) + } + + /** + * Generate id events randomly using a uniform distribution, from the inclusive range min to max. + */ + final class Uniform(min: Long, max: Long, events: Int) extends SyntheticGenerator(events) { + override protected def createGenerator(): NumberGenerator = new UniformLongGenerator(min, max) + } + + /** + * Generate id events based on an exponential distribution given the mean (expected value) of the distribution. + */ + final class Exponential(mean: Double, events: Int) extends SyntheticGenerator(events) { + override protected def createGenerator(): NumberGenerator = new ExponentialGenerator(mean) + } + + /** + * Generate id events for a hotspot distribution, where x% ('rate') of operations access y% ('hot') of the id space. + */ + final class Hotspot(min: Long, max: Long, hot: Double, rate: Double, events: Int) extends SyntheticGenerator(events) { + override protected def createGenerator(): NumberGenerator = new HotspotIntegerGenerator(min, max, hot, rate) + } + + /** + * Generate id events where some ids in the id space are more popular than others, based on a zipfian distribution. + */ + final class Zipfian(min: Long, max: Long, constant: Double, events: Int) extends SyntheticGenerator(events) { + override protected def createGenerator(): NumberGenerator = new ZipfianGenerator(min, max, constant) + } + + /** + * Generate id events where some ids are more popular than others, based on a zipfian distribution, and the popular + * ids are scattered over the id space. + */ + final class ScrambledZipfian(min: Long, max: Long, constant: Double, events: Int) extends SyntheticGenerator(events) { + override protected def createGenerator(): NumberGenerator = new ScrambledZipfianGenerator(min, max, constant) + } +} + +abstract class TraceFileReader(path: String) extends AccessPattern { + protected def lines: Source[String, NotUsed] = + FileIO + .fromPath(Paths.get(path)) + .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true)) + .map(_.utf8String) + .mapMaterializedValue(_ => NotUsed) +} + +object TraceFileReader { + + /** + * Simple trace file format: entity id per line. + */ + final class Simple(path: String) extends TraceFileReader(path: String) { + override def entityIds: Source[EntityId, NotUsed] = lines + } + + /** + * Read traces provided with the "ARC" paper. + * Nimrod Megiddo and Dharmendra S. Modha, "ARC: A Self-Tuning, Low Overhead Replacement Cache". + */ + final class Arc(path: String) extends TraceFileReader(path: String) { + override def entityIds: Source[EntityId, NotUsed] = lines.mapConcat { line => + val parts = line.split(" ") + val startId = parts(0).toLong + val numberOfIds = parts(1).toInt + (startId until (startId + numberOfIds)).map(_.toString) + } + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/Simulator.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/Simulator.scala new file mode 100644 index 0000000000..9d025e8723 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/Simulator.scala @@ -0,0 +1,213 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.cluster.sharding.passivation.simulator + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.cluster.sharding.internal.{ EntityPassivationStrategy, LeastRecentlyUsedEntityPassivationStrategy } +import akka.stream.scaladsl.{ Flow, Source } +import com.typesafe.config.ConfigFactory + +import scala.collection.{ immutable, mutable } +import scala.concurrent.{ ExecutionContext, Future } +import scala.util.{ Failure, Success } + +/** + * Simulator for testing the efficiency of passivation strategies. + * For access pattern, can read cache replacement policy traces or generate synthetic events. + */ +object Simulator { + def main(args: Array[String]): Unit = { + val configName = args.headOption + println(s"Running passivation simulator with [${configName.getOrElse("default application")}] config") + val config = configName.fold(ConfigFactory.load)(ConfigFactory.load) + run(SimulatorSettings(config)) + } + + final case class Results(name: String, stats: ShardingStats) + + def run(settings: SimulatorSettings): Unit = { + val simulations = settings.runs.map(Simulation.apply) + implicit val system: ActorSystem = ActorSystem("simulator") + implicit val ec: ExecutionContext = system.dispatcher + Source(simulations) + .runFoldAsync(Seq.empty[Results]) { (results, simulation) => + println(s"Running [${simulation.name}] ...") + simulate(simulation).map(stats => results :+ Results(simulation.name, stats)) + } + .onComplete { + case Success(allResults) => + val summary = allResults.map { results => + if (settings.printDetailedStats) { + println(results.name) + PrintData(DataTable(results.stats)) + println() + } + results.name +: DataTable.row(results.stats.totals) + } + PrintData(DataTable(DataTable.Headers.RunStats, summary)) + system.terminate() + case Failure(exception) => + println(s"Failed to run simulations: $exception") + system.terminate() + } + } + + final case class Simulation( + name: String, + numberOfShards: Int, + numberOfRegions: Int, + accessPattern: AccessPattern, + createStrategy: () => EntityPassivationStrategy) + + object Simulation { + def apply(runSettings: SimulatorSettings.RunSettings): Simulation = + Simulation( + name = runSettings.name, + numberOfShards = runSettings.shards, + numberOfRegions = runSettings.regions, + accessPattern = accessPattern(runSettings), + createStrategy = strategyCreator(runSettings)) + + def accessPattern(runSettings: SimulatorSettings.RunSettings): AccessPattern = runSettings.pattern match { + case SimulatorSettings.PatternSettings.Synthetic(generator, events) => + generator match { + case SimulatorSettings.PatternSettings.Synthetic.Sequence(start) => + new SyntheticGenerator.Sequence(start, events) + case SimulatorSettings.PatternSettings.Synthetic.Uniform(min, max) => + new SyntheticGenerator.Uniform(min, max, events) + case SimulatorSettings.PatternSettings.Synthetic.Exponential(mean) => + new SyntheticGenerator.Exponential(mean, events) + case SimulatorSettings.PatternSettings.Synthetic.Hotspot(min, max, hot, rate) => + new SyntheticGenerator.Hotspot(min, max, hot, rate, events) + case SimulatorSettings.PatternSettings.Synthetic.Zipfian(min, max, constant, scrambled) => + if (scrambled) new SyntheticGenerator.ScrambledZipfian(min, max, constant, events) + else new SyntheticGenerator.Zipfian(min, max, constant, events) + } + case SimulatorSettings.PatternSettings.Trace(path, format) => + format match { + case "arc" => new TraceFileReader.Arc(path) + case "simple" => new TraceFileReader.Simple(path) + case _ => sys.error(s"Unknown trace file format [$format]") + } + } + + def strategyCreator(runSettings: SimulatorSettings.RunSettings): () => EntityPassivationStrategy = + runSettings.strategy match { + case SimulatorSettings.StrategySettings.LeastRecentlyUsed(perRegionLimit) => + () => new LeastRecentlyUsedEntityPassivationStrategy(perRegionLimit) + } + } + + object Id { + def hashed(id: String, n: Int): String = + padded(math.abs(id.hashCode % n), n - 1) + + def padded(id: Int, max: Int): String = { + val maxDigits = math.floor(math.log10(max)).toInt + 1 + s"%0${maxDigits}d".format(id) + } + } + + type RegionId = String + type ShardId = String + type EntityId = String + + final case class Access(regionId: RegionId, shardId: ShardId, entityId: EntityId) + + sealed trait Event + final case class Accessed(regionId: RegionId, shardId: ShardId, entityId: EntityId) extends Event + final case class Activated(regionId: RegionId, shardId: ShardId, entityId: EntityId) extends Event + final case class Passivated(regionId: RegionId, shardId: ShardId, entityIds: immutable.Seq[EntityId]) extends Event + + def simulate(simulation: Simulation)(implicit system: ActorSystem): Future[ShardingStats] = + simulation.accessPattern.entityIds + .via(ShardAllocation(simulation.numberOfShards, simulation.numberOfRegions)) + .via(ShardingState(simulation.createStrategy)) + .runWith(SimulatorStats()) + + object ShardAllocation { + def apply(numberOfShards: Int, numberOfRegions: Int): Flow[EntityId, Access, NotUsed] = + Flow[EntityId].statefulMapConcat(() => { + val allocation = new ShardAllocation(numberOfShards, numberOfRegions) + entityId => List(allocation.access(entityId)) + }) + } + + final class ShardAllocation(numberOfShards: Int, numberOfRegions: Int) { + private var allocationMap: Map[RegionId, Set[ShardId]] = (1 to numberOfRegions).map { id => + Id.padded(id, numberOfRegions) -> Set.empty[ShardId] + }.toMap + + def access(entityId: EntityId): Access = { + val shardId = extractShardId(entityId) + val regionId = currentRegion(shardId).getOrElse(allocateShard(shardId)) + Access(regionId, shardId, entityId) + } + + // simulate default shard id extractor + private def extractShardId(entityId: EntityId): ShardId = + Id.hashed(entityId, numberOfShards) + + private def currentRegion(shardId: ShardId): Option[RegionId] = + allocationMap.collectFirst { case (regionId, allocated) if allocated.contains(shardId) => regionId } + + // simulate default least shard allocation + private def allocateShard(shardId: ShardId): RegionId = { + val regionId = allocationMap.toSeq.sortBy(_._1).minBy(_._2)(Ordering.by(_.size))._1 + allocationMap = allocationMap.updated(regionId, allocationMap(regionId) + shardId) + regionId + } + } + + object ShardingState { + def apply(createStrategy: () => EntityPassivationStrategy): Flow[Access, Event, NotUsed] = + Flow[Access].statefulMapConcat(() => { + val state = new ShardingState(createStrategy) + access => state.process(access) + }) + } + + final class ShardingState(createStrategy: () => EntityPassivationStrategy) { + private val active = mutable.Map.empty[RegionId, mutable.Map[ShardId, ShardState]] + + def process(access: Access): Seq[Event] = { + val regionId = access.regionId + val shardId = access.shardId + val region = active.getOrElseUpdate(regionId, mutable.Map.empty) + val alreadyActive = region.contains(shardId) + val shard = region.getOrElseUpdate(shardId, new ShardState(regionId, shardId, createStrategy())) + val passivated = if (!alreadyActive) region.values.toSeq.flatMap(_.updated(region.size)) else Nil + passivated ++ shard.accessed(access.entityId) + } + } + + final class ShardState(regionId: RegionId, shardId: ShardId, strategy: EntityPassivationStrategy) { + private val activeEntities = mutable.Set.empty[EntityId] + + def updated(activeShards: Int): immutable.Seq[Event] = { + val passivateEntities = strategy.shardsUpdated(activeShards) + passivateEntities.foreach(activeEntities.remove) + if (passivateEntities.isEmpty) Nil + else List(Passivated(regionId, shardId, passivateEntities)) + } + + def accessed(entityId: EntityId): immutable.Seq[Event] = { + val changes = if (activeEntities.contains(entityId)) { + strategy.entityTouched(entityId) + Nil + } else { + activeEntities += entityId + val passivateEntities = strategy.entityCreated(entityId) + passivateEntities.foreach(activeEntities.remove) + val passivated = + if (passivateEntities.isEmpty) Nil + else List(Passivated(regionId, shardId, passivateEntities)) + passivated :+ Activated(regionId, shardId, entityId) + } + changes :+ Accessed(regionId, shardId, entityId) + } + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorSettings.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorSettings.scala new file mode 100644 index 0000000000..0cc6333cef --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorSettings.scala @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.cluster.sharding.passivation.simulator + +import akka.japi.Util.immutableSeq +import com.typesafe.config.Config + +import java.util.Locale +import scala.collection.immutable + +final case class SimulatorSettings(runs: immutable.Seq[SimulatorSettings.RunSettings], printDetailedStats: Boolean) + +object SimulatorSettings { + def apply(testConfig: Config): SimulatorSettings = { + val simulatorConfig = testConfig.getConfig("akka.cluster.sharding.passivation.simulator") + val runDefaults = simulatorConfig.getConfig("run-defaults") + val runs = immutableSeq(simulatorConfig.getConfigList("runs")).map { runConfig => + RunSettings(simulatorConfig, runConfig.withFallback(runDefaults)) + } + val printDetailedStats = simulatorConfig.getBoolean("print-detailed-stats") + SimulatorSettings(runs, printDetailedStats) + } + + final case class RunSettings( + name: String, + shards: Int, + regions: Int, + strategy: StrategySettings, + pattern: PatternSettings) + + object RunSettings { + def apply(simulatorConfig: Config, runConfig: Config): RunSettings = { + val name = runConfig.getString("name") + val shards = runConfig.getInt("shards") + val regions = runConfig.getInt("regions") + val strategy = StrategySettings(simulatorConfig, runConfig.getString("strategy")) + val pattern = PatternSettings(simulatorConfig, runConfig.getString("pattern")) + RunSettings(name, shards, regions, strategy, pattern) + } + } + + sealed trait StrategySettings + + object StrategySettings { + final case class LeastRecentlyUsed(perRegionLimit: Int) extends StrategySettings + + def apply(simulatorConfig: Config, strategy: String): StrategySettings = { + val config = simulatorConfig.getConfig(strategy).withFallback(simulatorConfig.getConfig("strategy-defaults")) + lowerCase(config.getString("strategy")) match { + case "least-recently-used" => LeastRecentlyUsed(config.getInt("least-recently-used.per-region-limit")) + case _ => sys.error(s"Unknown strategy for [$strategy]") + } + } + } + + sealed trait PatternSettings + + object PatternSettings { + final case class Synthetic(generator: Synthetic.Generator, events: Int) extends PatternSettings + + object Synthetic { + sealed trait Generator + final case class Sequence(start: Long) extends Generator + final case class Uniform(min: Long, max: Long) extends Generator + final case class Exponential(mean: Double) extends Generator + final case class Hotspot(min: Long, max: Long, hot: Double, rate: Double) extends Generator + final case class Zipfian(min: Long, max: Long, constant: Double, scrambled: Boolean) extends Generator + + def apply(patternConfig: Config): Synthetic = { + val config = patternConfig.getConfig("synthetic") + val generator = lowerCase(config.getString("generator")) match { + case "sequence" => + val start = config.getLong("sequence.start") + Sequence(start) + case "uniform" => + val min = config.getLong("uniform.min") + val max = config.getLong("uniform.max") + Uniform(min, max) + case "exponential" => + val mean = config.getDouble("exponential.mean") + Exponential(mean) + case "hotspot" => + val min = config.getLong("hotspot.min") + val max = config.getLong("hotspot.max") + val hot = config.getDouble("hotspot.hot") + val rate = config.getDouble("hotspot.rate") + Hotspot(min, max, hot, rate) + case "zipfian" => + val min = config.getLong("zipfian.min") + val max = config.getLong("zipfian.max") + val constant = config.getDouble("zipfian.constant") + val scrambled = config.getBoolean("zipfian.scrambled") + Zipfian(min, max, constant, scrambled) + } + Synthetic(generator, config.getInt("events")) + } + } + + final case class Trace(path: String, format: String) extends PatternSettings + + object Trace { + def apply(patternConfig: Config): Trace = { + val config = patternConfig.getConfig("trace") + val path = config.getString("path") + val format = lowerCase(config.getString("format")) + Trace(path, format) + } + } + + def apply(simulatorConfig: Config, pattern: String): PatternSettings = { + val config = simulatorConfig.getConfig(pattern).withFallback(simulatorConfig.getConfig("pattern-defaults")) + lowerCase(config.getString("pattern")) match { + case "synthetic" => Synthetic(config) + case "trace" => Trace(config) + case _ => sys.error(s"Unknown pattern for [$pattern]") + } + } + } + + private def lowerCase(s: String): String = s.toLowerCase(Locale.ROOT) +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorStats.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorStats.scala new file mode 100644 index 0000000000..e06f67a5f8 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorStats.scala @@ -0,0 +1,140 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.cluster.sharding.passivation.simulator + +import akka.stream.scaladsl.Sink + +import scala.concurrent.Future + +object SimulatorStats { + def apply(): Sink[Simulator.Event, Future[ShardingStats]] = + Sink.fold(ShardingStats()) { + case (stats, Simulator.Accessed(regionId, shardId, _)) => + stats.accessed(regionId, shardId) + case (stats, Simulator.Activated(regionId, shardId, _)) => + stats.activated(regionId, shardId) + case (stats, Simulator.Passivated(regionId, shardId, entityIds)) => + stats.passivated(regionId, shardId, entityIds.size) + } +} + +final case class EntityStats(accesses: Int = 0, activations: Int = 0, passivations: Int = 0) { + def accessed(): EntityStats = copy(accesses + 1, activations, passivations) + def activated(): EntityStats = copy(accesses, activations + 1, passivations) + def passivated(n: Int): EntityStats = copy(accesses, activations, passivations + n) + + def activePercent: Float = (1 - (activations.toFloat / accesses)) * 100 + + def ++(other: EntityStats): EntityStats = + EntityStats(accesses + other.accesses, activations + other.activations, passivations + other.passivations) +} + +final case class RegionStats(shardStats: Map[String, EntityStats] = Map.empty) { + def accessed(shardId: String): RegionStats = + RegionStats(shardStats.updated(shardId, shardStats.getOrElse(shardId, EntityStats()).accessed())) + + def activated(shardId: String): RegionStats = + RegionStats(shardStats.updated(shardId, shardStats.getOrElse(shardId, EntityStats()).activated())) + + def passivated(shardId: String, n: Int): RegionStats = + RegionStats(shardStats.updated(shardId, shardStats.getOrElse(shardId, EntityStats()).passivated(n))) + + def totals: EntityStats = shardStats.values.foldLeft(EntityStats())(_ ++ _) +} + +final case class ShardingStats(regionStats: Map[String, RegionStats] = Map.empty) { + def accessed(regionId: String, shardId: String): ShardingStats = + ShardingStats(regionStats.updated(regionId, regionStats.getOrElse(regionId, RegionStats()).accessed(shardId))) + + def activated(regionId: String, shardId: String): ShardingStats = + ShardingStats(regionStats.updated(regionId, regionStats.getOrElse(regionId, RegionStats()).activated(shardId))) + + def passivated(regionId: String, shardId: String, n: Int): ShardingStats = + ShardingStats(regionStats.updated(regionId, regionStats.getOrElse(regionId, RegionStats()).passivated(shardId, n))) + + def ++(other: ShardingStats): ShardingStats = + ShardingStats(regionStats ++ other.regionStats) + + def totals: EntityStats = regionStats.values.foldLeft(EntityStats())(_ ++ _.totals) +} + +case class DataTable(headers: DataTable.Row, rows: Seq[DataTable.Row]) + +object DataTable { + type Row = IndexedSeq[String] + + object Headers { + val EntityStats: Row = IndexedSeq("Active", "Accesses", "Activations", "Passivations") + val RegionStats: Row = "Shard" +: EntityStats + val ShardingStats: Row = "Region" +: RegionStats + val RunStats: Row = "Run" +: EntityStats + } + + def apply(stats: EntityStats): DataTable = + DataTable(Headers.EntityStats, Seq(row(stats))) + + def row(stats: EntityStats): Row = + IndexedSeq( + f"${stats.activePercent}%.2f %%", + f"${stats.accesses}%,d", + f"${stats.activations}%,d", + f"${stats.passivations}%,d") + + def apply(stats: RegionStats): DataTable = + DataTable(Headers.RegionStats, stats.shardStats.toSeq.sortBy(_._1).flatMap { + case (shardId, stats) => DataTable(stats).rows.map(shardId +: _) + }) + + def apply(stats: ShardingStats): DataTable = + DataTable(Headers.ShardingStats, stats.regionStats.toSeq.sortBy(_._1).flatMap { + case (regionId, stats) => DataTable(stats).rows.map(regionId +: _) + }) +} + +object PrintData { + def apply(data: DataTable): Unit = { + val columnWidths = determineColumnWidths(data) + val builder = new StringBuilder + builder ++= topDivider(columnWidths) + builder ++= line(data.headers, columnWidths) + data.rows.take(1).foreach { row => + builder ++= headerDivider(columnWidths) + builder ++= line(row, columnWidths) + } + data.rows.drop(1).foreach { rowData => + builder ++= rowDivider(columnWidths) + builder ++= line(rowData, columnWidths) + } + builder ++= bottomDivider(columnWidths) + builder ++= "\n" + println(builder.result()) + } + + private def determineColumnWidths(data: DataTable): IndexedSeq[Int] = { + val allRows = data.headers +: data.rows + data.headers.indices.map(i => allRows.map(row => row(i).length).max) + } + + private def topDivider(columnWidths: Seq[Int]): String = + divider("╔", "═", "╤", "╗", columnWidths) + + private def headerDivider(columnWidths: Seq[Int]): String = + divider("╠", "═", "╪", "╣", columnWidths) + + private def rowDivider(columnWidths: Seq[Int]): String = + divider("╟", "─", "┼", "╢", columnWidths) + + private def bottomDivider(columnWidths: Seq[Int]): String = + divider("╚", "═", "╧", "╝", columnWidths) + + private def divider(start: String, line: String, separator: String, end: String, columnWidths: Seq[Int]): String = + columnWidths.map(width => line * (width + 2)).mkString(start, separator, end) + "\n" + + private def line(row: DataTable.Row, columnWidths: Seq[Int]): String = + row.zip(columnWidths).map({ case (cell, width) => pad(cell, width) }).mkString("║ ", " │ ", " ║") + "\n" + + private def pad(string: String, width: Int): String = + " " * (width - string.length) + string +} diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index da653b1733..8ea90a19db 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -125,7 +125,8 @@ object AkkaDisciplinePlugin extends AutoPlugin { // https://github.com/akka/akka/issues/26119 Compile / doc / scalacOptions --= disciplineScalacOptions.toSeq :+ "-Xfatal-warnings", // having discipline warnings in console is just an annoyance - Compile / console / scalacOptions --= disciplineScalacOptions.toSeq) + Compile / console / scalacOptions --= disciplineScalacOptions.toSeq, + Test / console / scalacOptions --= disciplineScalacOptions.toSeq) } else { // we still need these in opt-out since the annotations are present nowarnSettings ++ Seq(Compile / scalacOptions += "-deprecation") diff --git a/project/Dependencies.scala b/project/Dependencies.scala index d64da8d8eb..f1db983102 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -181,6 +181,9 @@ object Dependencies { val reactiveStreamsTck = "org.reactivestreams" % "reactive-streams-tck" % reactiveStreamsVersion % "test" // CC0 val protobufRuntime = "com.google.protobuf" % "protobuf-java" % protobufJavaVersion % "test" + + // YCSB (Yahoo Cloud Serving Benchmark https://ycsb.site) + val ycsb = "site.ycsb" % "core" % "0.17.0" % "test" // ApacheV2 } object Provided { @@ -262,7 +265,8 @@ object Dependencies { Provided.levelDBNative, Test.junit, Test.scalatest.value, - Test.commonsIo) + Test.commonsIo, + Test.ycsb) val clusterMetrics = l ++= Seq( Provided.sigarLoader,