Add access pattern simulator for testing entity passivation strategies (#30884)

* with licence comment for ycsb
This commit is contained in:
Peter Vlugter 2021-11-17 01:00:05 +13:00 committed by GitHub
parent 058bf1029a
commit a601f36b20
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 864 additions and 2 deletions

View file

@ -0,0 +1,98 @@
#
# Run the "database" trace (DS1) from the authors of the ARC algorithm.
#
# Nimrod Megiddo and Dharmendra S. Modha, "ARC: A Self-Tuning, Low Overhead Replacement Cache".
#
# Download traces from the author's home page:
# https://researcher.watson.ibm.com/researcher/view_person_subpage.php?id=4700
#
# > akka-cluster-sharding/Test/runMain akka.cluster.sharding.passivation.simulator.Simulator arc-trace-database
#
# ╔════════╤═════════╤════════════╤═════════════╤══════════════╗
# ║ Run │ Active │ Accesses │ Activations │ Passivations ║
# ╠════════╪═════════╪════════════╪═════════════╪══════════════╣
# ║ LRU 1M │ 3.09 % │ 43,704,979 │ 42,356,500 │ 41,356,500 ║
# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ LRU 2M │ 10.75 % │ 43,704,979 │ 39,007,141 │ 37,007,141 ║
# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ LRU 4M │ 20.24 % │ 43,704,979 │ 34,857,131 │ 30,857,131 ║
# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ LRU 8M │ 43.03 % │ 43,704,979 │ 24,896,638 │ 16,896,638 ║
# ╚════════╧═════════╧════════════╧═════════════╧══════════════╝
#
arc-traces="arc-traces"
arc-traces=${?ARC_TRACES}
akka.cluster.sharding {
passivation.simulator {
runs = [
{
name = "LRU 1M"
shards = 100
regions = 10
pattern = arc-database
strategy = lru-100k
},
{
name = "LRU 2M"
shards = 100
regions = 10
pattern = arc-database
strategy = lru-200k
},
{
name = "LRU 4M"
shards = 100
regions = 10
pattern = arc-database
strategy = lru-400k
},
{
name = "LRU 8M"
shards = 100
regions = 10
pattern = arc-database
strategy = lru-800k
}
]
print-detailed-stats = true
arc-database {
pattern = trace
trace {
format = arc
path = ${arc-traces}"/DS1.lis"
}
}
lru-100k {
strategy = least-recently-used
least-recently-used {
per-region-limit = 100000
}
}
lru-200k {
strategy = least-recently-used
least-recently-used {
per-region-limit = 200000
}
}
lru-400k {
strategy = least-recently-used
least-recently-used {
per-region-limit = 400000
}
}
lru-800k {
strategy = least-recently-used
least-recently-used {
per-region-limit = 800000
}
}
}
}

View file

@ -0,0 +1,82 @@
#
# Run the merged "search" trace (MergeS) from the authors of the ARC algorithm.
#
# Nimrod Megiddo and Dharmendra S. Modha, "ARC: A Self-Tuning, Low Overhead Replacement Cache".
#
# Download traces from the author's home page:
# https://researcher.watson.ibm.com/researcher/view_person_subpage.php?id=4700
#
# > akka-cluster-sharding/Test/runMain akka.cluster.sharding.passivation.simulator.Simulator arc-trace-search
#
# ╔══════════╤═════════╤════════════╤═════════════╤══════════════╗
# ║ Run │ Active │ Accesses │ Activations │ Passivations ║
# ╠══════════╪═════════╪════════════╪═════════════╪══════════════╣
# ║ LRU 250k │ 3.07 % │ 37,656,092 │ 36,498,516 │ 36,248,516 ║
# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ LRU 500k │ 7.53 % │ 37,656,092 │ 34,822,122 │ 34,322,122 ║
# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ LRU 1M │ 25.37 % │ 37,656,092 │ 28,102,287 │ 27,102,287 ║
# ╚══════════╧═════════╧════════════╧═════════════╧══════════════╝
#
arc-traces="arc-traces"
arc-traces=${?ARC_TRACES}
akka.cluster.sharding {
passivation.simulator {
runs = [
{
name = "LRU 250k"
shards = 100
regions = 10
pattern = arc-search-merged
strategy = lru-25k
},
{
name = "LRU 500k"
shards = 100
regions = 10
pattern = arc-search-merged
strategy = lru-50k
},
{
name = "LRU 1M"
shards = 100
regions = 10
pattern = arc-search-merged
strategy = lru-100k
}
]
print-detailed-stats = true
arc-search-merged {
pattern = trace
trace {
format = arc
path = ${arc-traces}"/MergeS.lis"
}
}
lru-25k {
strategy = least-recently-used
least-recently-used {
per-region-limit = 25000
}
}
lru-50k {
strategy = least-recently-used
least-recently-used {
per-region-limit = 50000
}
}
lru-100k {
strategy = least-recently-used
least-recently-used {
per-region-limit = 100000
}
}
}
}

View file

@ -0,0 +1,46 @@
akka.cluster.sharding {
passivation.simulator {
runs = []
print-detailed-stats = false
run-defaults {
shards = 100
regions = 10
}
strategy-defaults {
strategy = least-recently-used
least-recently-used {
per-region-limit = 100000
}
}
pattern-defaults {
synthetic {
sequence {
start = 1
}
uniform {
min = 1
max = 10000000
}
exponential {
mean = 100
}
hotspot {
min = 1
max = 10000000
hot = 0.1
rate = 0.3
}
zipfian {
min = 1
max = 10000000
constant = 0.99
scrambled = true
}
}
}
}
}

View file

@ -0,0 +1,51 @@
#
# Run with synthetically generated access events with a scrambled zipfian distribution.
#
# > akka-cluster-sharding/Test/runMain akka.cluster.sharding.passivation.simulator.Simulator synthetic-zipfian
#
# ╔══════════╤═════════╤════════════╤═════════════╤══════════════╗
# ║ Run │ Active │ Accesses │ Activations │ Passivations ║
# ╠══════════╪═════════╪════════════╪═════════════╪══════════════╣
# ║ LRU 100k │ 40.47 % │ 50,000,000 │ 29,764,380 │ 29,664,380 ║
# ╚══════════╧═════════╧════════════╧═════════════╧══════════════╝
#
akka.cluster.sharding {
passivation.simulator {
runs = [
{
name = "LRU 100k"
shards = 100
regions = 10
pattern = scrambled-zipfian
strategy = lru-10k
}
]
print-detailed-stats = true
# scrambled zipfian distribution
# generate 50M events over 10M ids
scrambled-zipfian {
pattern = synthetic
synthetic {
events = 50000000
generator = zipfian
zipfian {
min = 1
max = 10000000
scrambled = true
}
}
}
# LRU strategy with 10k limit in each of 10 regions
# total limit across cluster of 100k (1% of id space)
lru-10k {
strategy = least-recently-used
least-recently-used {
per-region-limit = 10000
}
}
}
}

View file

@ -0,0 +1,104 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.passivation.simulator
import akka.NotUsed
import akka.cluster.sharding.ShardRegion.EntityId
import akka.stream.scaladsl._
import akka.util.ByteString
import java.nio.file.Paths
trait AccessPattern {
def entityIds: Source[EntityId, NotUsed]
}
abstract class SyntheticGenerator(events: Int) extends AccessPattern {
protected def createGenerator(): site.ycsb.generator.NumberGenerator
override def entityIds: Source[EntityId, NotUsed] = Source.unfold(createGenerator() -> 0) {
case (_, count) if count >= events => None
case (generator, count) => Option(generator.nextValue().longValue.toString).map(generator -> (count + 1) -> _)
}
}
object SyntheticGenerator {
import site.ycsb.generator._
/**
* Generate a sequence of unique id events.
*/
final class Sequence(start: Long, events: Int) extends SyntheticGenerator(events) {
override protected def createGenerator(): NumberGenerator = new CounterGenerator(start)
}
/**
* Generate id events randomly using a uniform distribution, from the inclusive range min to max.
*/
final class Uniform(min: Long, max: Long, events: Int) extends SyntheticGenerator(events) {
override protected def createGenerator(): NumberGenerator = new UniformLongGenerator(min, max)
}
/**
* Generate id events based on an exponential distribution given the mean (expected value) of the distribution.
*/
final class Exponential(mean: Double, events: Int) extends SyntheticGenerator(events) {
override protected def createGenerator(): NumberGenerator = new ExponentialGenerator(mean)
}
/**
* Generate id events for a hotspot distribution, where x% ('rate') of operations access y% ('hot') of the id space.
*/
final class Hotspot(min: Long, max: Long, hot: Double, rate: Double, events: Int) extends SyntheticGenerator(events) {
override protected def createGenerator(): NumberGenerator = new HotspotIntegerGenerator(min, max, hot, rate)
}
/**
* Generate id events where some ids in the id space are more popular than others, based on a zipfian distribution.
*/
final class Zipfian(min: Long, max: Long, constant: Double, events: Int) extends SyntheticGenerator(events) {
override protected def createGenerator(): NumberGenerator = new ZipfianGenerator(min, max, constant)
}
/**
* Generate id events where some ids are more popular than others, based on a zipfian distribution, and the popular
* ids are scattered over the id space.
*/
final class ScrambledZipfian(min: Long, max: Long, constant: Double, events: Int) extends SyntheticGenerator(events) {
override protected def createGenerator(): NumberGenerator = new ScrambledZipfianGenerator(min, max, constant)
}
}
abstract class TraceFileReader(path: String) extends AccessPattern {
protected def lines: Source[String, NotUsed] =
FileIO
.fromPath(Paths.get(path))
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.mapMaterializedValue(_ => NotUsed)
}
object TraceFileReader {
/**
* Simple trace file format: entity id per line.
*/
final class Simple(path: String) extends TraceFileReader(path: String) {
override def entityIds: Source[EntityId, NotUsed] = lines
}
/**
* Read traces provided with the "ARC" paper.
* Nimrod Megiddo and Dharmendra S. Modha, "ARC: A Self-Tuning, Low Overhead Replacement Cache".
*/
final class Arc(path: String) extends TraceFileReader(path: String) {
override def entityIds: Source[EntityId, NotUsed] = lines.mapConcat { line =>
val parts = line.split(" ")
val startId = parts(0).toLong
val numberOfIds = parts(1).toInt
(startId until (startId + numberOfIds)).map(_.toString)
}
}
}

View file

@ -0,0 +1,213 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.passivation.simulator
import akka.NotUsed
import akka.actor.ActorSystem
import akka.cluster.sharding.internal.{ EntityPassivationStrategy, LeastRecentlyUsedEntityPassivationStrategy }
import akka.stream.scaladsl.{ Flow, Source }
import com.typesafe.config.ConfigFactory
import scala.collection.{ immutable, mutable }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success }
/**
* Simulator for testing the efficiency of passivation strategies.
* For access pattern, can read cache replacement policy traces or generate synthetic events.
*/
object Simulator {
def main(args: Array[String]): Unit = {
val configName = args.headOption
println(s"Running passivation simulator with [${configName.getOrElse("default application")}] config")
val config = configName.fold(ConfigFactory.load)(ConfigFactory.load)
run(SimulatorSettings(config))
}
final case class Results(name: String, stats: ShardingStats)
def run(settings: SimulatorSettings): Unit = {
val simulations = settings.runs.map(Simulation.apply)
implicit val system: ActorSystem = ActorSystem("simulator")
implicit val ec: ExecutionContext = system.dispatcher
Source(simulations)
.runFoldAsync(Seq.empty[Results]) { (results, simulation) =>
println(s"Running [${simulation.name}] ...")
simulate(simulation).map(stats => results :+ Results(simulation.name, stats))
}
.onComplete {
case Success(allResults) =>
val summary = allResults.map { results =>
if (settings.printDetailedStats) {
println(results.name)
PrintData(DataTable(results.stats))
println()
}
results.name +: DataTable.row(results.stats.totals)
}
PrintData(DataTable(DataTable.Headers.RunStats, summary))
system.terminate()
case Failure(exception) =>
println(s"Failed to run simulations: $exception")
system.terminate()
}
}
final case class Simulation(
name: String,
numberOfShards: Int,
numberOfRegions: Int,
accessPattern: AccessPattern,
createStrategy: () => EntityPassivationStrategy)
object Simulation {
def apply(runSettings: SimulatorSettings.RunSettings): Simulation =
Simulation(
name = runSettings.name,
numberOfShards = runSettings.shards,
numberOfRegions = runSettings.regions,
accessPattern = accessPattern(runSettings),
createStrategy = strategyCreator(runSettings))
def accessPattern(runSettings: SimulatorSettings.RunSettings): AccessPattern = runSettings.pattern match {
case SimulatorSettings.PatternSettings.Synthetic(generator, events) =>
generator match {
case SimulatorSettings.PatternSettings.Synthetic.Sequence(start) =>
new SyntheticGenerator.Sequence(start, events)
case SimulatorSettings.PatternSettings.Synthetic.Uniform(min, max) =>
new SyntheticGenerator.Uniform(min, max, events)
case SimulatorSettings.PatternSettings.Synthetic.Exponential(mean) =>
new SyntheticGenerator.Exponential(mean, events)
case SimulatorSettings.PatternSettings.Synthetic.Hotspot(min, max, hot, rate) =>
new SyntheticGenerator.Hotspot(min, max, hot, rate, events)
case SimulatorSettings.PatternSettings.Synthetic.Zipfian(min, max, constant, scrambled) =>
if (scrambled) new SyntheticGenerator.ScrambledZipfian(min, max, constant, events)
else new SyntheticGenerator.Zipfian(min, max, constant, events)
}
case SimulatorSettings.PatternSettings.Trace(path, format) =>
format match {
case "arc" => new TraceFileReader.Arc(path)
case "simple" => new TraceFileReader.Simple(path)
case _ => sys.error(s"Unknown trace file format [$format]")
}
}
def strategyCreator(runSettings: SimulatorSettings.RunSettings): () => EntityPassivationStrategy =
runSettings.strategy match {
case SimulatorSettings.StrategySettings.LeastRecentlyUsed(perRegionLimit) =>
() => new LeastRecentlyUsedEntityPassivationStrategy(perRegionLimit)
}
}
object Id {
def hashed(id: String, n: Int): String =
padded(math.abs(id.hashCode % n), n - 1)
def padded(id: Int, max: Int): String = {
val maxDigits = math.floor(math.log10(max)).toInt + 1
s"%0${maxDigits}d".format(id)
}
}
type RegionId = String
type ShardId = String
type EntityId = String
final case class Access(regionId: RegionId, shardId: ShardId, entityId: EntityId)
sealed trait Event
final case class Accessed(regionId: RegionId, shardId: ShardId, entityId: EntityId) extends Event
final case class Activated(regionId: RegionId, shardId: ShardId, entityId: EntityId) extends Event
final case class Passivated(regionId: RegionId, shardId: ShardId, entityIds: immutable.Seq[EntityId]) extends Event
def simulate(simulation: Simulation)(implicit system: ActorSystem): Future[ShardingStats] =
simulation.accessPattern.entityIds
.via(ShardAllocation(simulation.numberOfShards, simulation.numberOfRegions))
.via(ShardingState(simulation.createStrategy))
.runWith(SimulatorStats())
object ShardAllocation {
def apply(numberOfShards: Int, numberOfRegions: Int): Flow[EntityId, Access, NotUsed] =
Flow[EntityId].statefulMapConcat(() => {
val allocation = new ShardAllocation(numberOfShards, numberOfRegions)
entityId => List(allocation.access(entityId))
})
}
final class ShardAllocation(numberOfShards: Int, numberOfRegions: Int) {
private var allocationMap: Map[RegionId, Set[ShardId]] = (1 to numberOfRegions).map { id =>
Id.padded(id, numberOfRegions) -> Set.empty[ShardId]
}.toMap
def access(entityId: EntityId): Access = {
val shardId = extractShardId(entityId)
val regionId = currentRegion(shardId).getOrElse(allocateShard(shardId))
Access(regionId, shardId, entityId)
}
// simulate default shard id extractor
private def extractShardId(entityId: EntityId): ShardId =
Id.hashed(entityId, numberOfShards)
private def currentRegion(shardId: ShardId): Option[RegionId] =
allocationMap.collectFirst { case (regionId, allocated) if allocated.contains(shardId) => regionId }
// simulate default least shard allocation
private def allocateShard(shardId: ShardId): RegionId = {
val regionId = allocationMap.toSeq.sortBy(_._1).minBy(_._2)(Ordering.by(_.size))._1
allocationMap = allocationMap.updated(regionId, allocationMap(regionId) + shardId)
regionId
}
}
object ShardingState {
def apply(createStrategy: () => EntityPassivationStrategy): Flow[Access, Event, NotUsed] =
Flow[Access].statefulMapConcat(() => {
val state = new ShardingState(createStrategy)
access => state.process(access)
})
}
final class ShardingState(createStrategy: () => EntityPassivationStrategy) {
private val active = mutable.Map.empty[RegionId, mutable.Map[ShardId, ShardState]]
def process(access: Access): Seq[Event] = {
val regionId = access.regionId
val shardId = access.shardId
val region = active.getOrElseUpdate(regionId, mutable.Map.empty)
val alreadyActive = region.contains(shardId)
val shard = region.getOrElseUpdate(shardId, new ShardState(regionId, shardId, createStrategy()))
val passivated = if (!alreadyActive) region.values.toSeq.flatMap(_.updated(region.size)) else Nil
passivated ++ shard.accessed(access.entityId)
}
}
final class ShardState(regionId: RegionId, shardId: ShardId, strategy: EntityPassivationStrategy) {
private val activeEntities = mutable.Set.empty[EntityId]
def updated(activeShards: Int): immutable.Seq[Event] = {
val passivateEntities = strategy.shardsUpdated(activeShards)
passivateEntities.foreach(activeEntities.remove)
if (passivateEntities.isEmpty) Nil
else List(Passivated(regionId, shardId, passivateEntities))
}
def accessed(entityId: EntityId): immutable.Seq[Event] = {
val changes = if (activeEntities.contains(entityId)) {
strategy.entityTouched(entityId)
Nil
} else {
activeEntities += entityId
val passivateEntities = strategy.entityCreated(entityId)
passivateEntities.foreach(activeEntities.remove)
val passivated =
if (passivateEntities.isEmpty) Nil
else List(Passivated(regionId, shardId, passivateEntities))
passivated :+ Activated(regionId, shardId, entityId)
}
changes :+ Accessed(regionId, shardId, entityId)
}
}
}

View file

@ -0,0 +1,123 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.passivation.simulator
import akka.japi.Util.immutableSeq
import com.typesafe.config.Config
import java.util.Locale
import scala.collection.immutable
final case class SimulatorSettings(runs: immutable.Seq[SimulatorSettings.RunSettings], printDetailedStats: Boolean)
object SimulatorSettings {
def apply(testConfig: Config): SimulatorSettings = {
val simulatorConfig = testConfig.getConfig("akka.cluster.sharding.passivation.simulator")
val runDefaults = simulatorConfig.getConfig("run-defaults")
val runs = immutableSeq(simulatorConfig.getConfigList("runs")).map { runConfig =>
RunSettings(simulatorConfig, runConfig.withFallback(runDefaults))
}
val printDetailedStats = simulatorConfig.getBoolean("print-detailed-stats")
SimulatorSettings(runs, printDetailedStats)
}
final case class RunSettings(
name: String,
shards: Int,
regions: Int,
strategy: StrategySettings,
pattern: PatternSettings)
object RunSettings {
def apply(simulatorConfig: Config, runConfig: Config): RunSettings = {
val name = runConfig.getString("name")
val shards = runConfig.getInt("shards")
val regions = runConfig.getInt("regions")
val strategy = StrategySettings(simulatorConfig, runConfig.getString("strategy"))
val pattern = PatternSettings(simulatorConfig, runConfig.getString("pattern"))
RunSettings(name, shards, regions, strategy, pattern)
}
}
sealed trait StrategySettings
object StrategySettings {
final case class LeastRecentlyUsed(perRegionLimit: Int) extends StrategySettings
def apply(simulatorConfig: Config, strategy: String): StrategySettings = {
val config = simulatorConfig.getConfig(strategy).withFallback(simulatorConfig.getConfig("strategy-defaults"))
lowerCase(config.getString("strategy")) match {
case "least-recently-used" => LeastRecentlyUsed(config.getInt("least-recently-used.per-region-limit"))
case _ => sys.error(s"Unknown strategy for [$strategy]")
}
}
}
sealed trait PatternSettings
object PatternSettings {
final case class Synthetic(generator: Synthetic.Generator, events: Int) extends PatternSettings
object Synthetic {
sealed trait Generator
final case class Sequence(start: Long) extends Generator
final case class Uniform(min: Long, max: Long) extends Generator
final case class Exponential(mean: Double) extends Generator
final case class Hotspot(min: Long, max: Long, hot: Double, rate: Double) extends Generator
final case class Zipfian(min: Long, max: Long, constant: Double, scrambled: Boolean) extends Generator
def apply(patternConfig: Config): Synthetic = {
val config = patternConfig.getConfig("synthetic")
val generator = lowerCase(config.getString("generator")) match {
case "sequence" =>
val start = config.getLong("sequence.start")
Sequence(start)
case "uniform" =>
val min = config.getLong("uniform.min")
val max = config.getLong("uniform.max")
Uniform(min, max)
case "exponential" =>
val mean = config.getDouble("exponential.mean")
Exponential(mean)
case "hotspot" =>
val min = config.getLong("hotspot.min")
val max = config.getLong("hotspot.max")
val hot = config.getDouble("hotspot.hot")
val rate = config.getDouble("hotspot.rate")
Hotspot(min, max, hot, rate)
case "zipfian" =>
val min = config.getLong("zipfian.min")
val max = config.getLong("zipfian.max")
val constant = config.getDouble("zipfian.constant")
val scrambled = config.getBoolean("zipfian.scrambled")
Zipfian(min, max, constant, scrambled)
}
Synthetic(generator, config.getInt("events"))
}
}
final case class Trace(path: String, format: String) extends PatternSettings
object Trace {
def apply(patternConfig: Config): Trace = {
val config = patternConfig.getConfig("trace")
val path = config.getString("path")
val format = lowerCase(config.getString("format"))
Trace(path, format)
}
}
def apply(simulatorConfig: Config, pattern: String): PatternSettings = {
val config = simulatorConfig.getConfig(pattern).withFallback(simulatorConfig.getConfig("pattern-defaults"))
lowerCase(config.getString("pattern")) match {
case "synthetic" => Synthetic(config)
case "trace" => Trace(config)
case _ => sys.error(s"Unknown pattern for [$pattern]")
}
}
}
private def lowerCase(s: String): String = s.toLowerCase(Locale.ROOT)
}

View file

@ -0,0 +1,140 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.passivation.simulator
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
object SimulatorStats {
def apply(): Sink[Simulator.Event, Future[ShardingStats]] =
Sink.fold(ShardingStats()) {
case (stats, Simulator.Accessed(regionId, shardId, _)) =>
stats.accessed(regionId, shardId)
case (stats, Simulator.Activated(regionId, shardId, _)) =>
stats.activated(regionId, shardId)
case (stats, Simulator.Passivated(regionId, shardId, entityIds)) =>
stats.passivated(regionId, shardId, entityIds.size)
}
}
final case class EntityStats(accesses: Int = 0, activations: Int = 0, passivations: Int = 0) {
def accessed(): EntityStats = copy(accesses + 1, activations, passivations)
def activated(): EntityStats = copy(accesses, activations + 1, passivations)
def passivated(n: Int): EntityStats = copy(accesses, activations, passivations + n)
def activePercent: Float = (1 - (activations.toFloat / accesses)) * 100
def ++(other: EntityStats): EntityStats =
EntityStats(accesses + other.accesses, activations + other.activations, passivations + other.passivations)
}
final case class RegionStats(shardStats: Map[String, EntityStats] = Map.empty) {
def accessed(shardId: String): RegionStats =
RegionStats(shardStats.updated(shardId, shardStats.getOrElse(shardId, EntityStats()).accessed()))
def activated(shardId: String): RegionStats =
RegionStats(shardStats.updated(shardId, shardStats.getOrElse(shardId, EntityStats()).activated()))
def passivated(shardId: String, n: Int): RegionStats =
RegionStats(shardStats.updated(shardId, shardStats.getOrElse(shardId, EntityStats()).passivated(n)))
def totals: EntityStats = shardStats.values.foldLeft(EntityStats())(_ ++ _)
}
final case class ShardingStats(regionStats: Map[String, RegionStats] = Map.empty) {
def accessed(regionId: String, shardId: String): ShardingStats =
ShardingStats(regionStats.updated(regionId, regionStats.getOrElse(regionId, RegionStats()).accessed(shardId)))
def activated(regionId: String, shardId: String): ShardingStats =
ShardingStats(regionStats.updated(regionId, regionStats.getOrElse(regionId, RegionStats()).activated(shardId)))
def passivated(regionId: String, shardId: String, n: Int): ShardingStats =
ShardingStats(regionStats.updated(regionId, regionStats.getOrElse(regionId, RegionStats()).passivated(shardId, n)))
def ++(other: ShardingStats): ShardingStats =
ShardingStats(regionStats ++ other.regionStats)
def totals: EntityStats = regionStats.values.foldLeft(EntityStats())(_ ++ _.totals)
}
case class DataTable(headers: DataTable.Row, rows: Seq[DataTable.Row])
object DataTable {
type Row = IndexedSeq[String]
object Headers {
val EntityStats: Row = IndexedSeq("Active", "Accesses", "Activations", "Passivations")
val RegionStats: Row = "Shard" +: EntityStats
val ShardingStats: Row = "Region" +: RegionStats
val RunStats: Row = "Run" +: EntityStats
}
def apply(stats: EntityStats): DataTable =
DataTable(Headers.EntityStats, Seq(row(stats)))
def row(stats: EntityStats): Row =
IndexedSeq(
f"${stats.activePercent}%.2f %%",
f"${stats.accesses}%,d",
f"${stats.activations}%,d",
f"${stats.passivations}%,d")
def apply(stats: RegionStats): DataTable =
DataTable(Headers.RegionStats, stats.shardStats.toSeq.sortBy(_._1).flatMap {
case (shardId, stats) => DataTable(stats).rows.map(shardId +: _)
})
def apply(stats: ShardingStats): DataTable =
DataTable(Headers.ShardingStats, stats.regionStats.toSeq.sortBy(_._1).flatMap {
case (regionId, stats) => DataTable(stats).rows.map(regionId +: _)
})
}
object PrintData {
def apply(data: DataTable): Unit = {
val columnWidths = determineColumnWidths(data)
val builder = new StringBuilder
builder ++= topDivider(columnWidths)
builder ++= line(data.headers, columnWidths)
data.rows.take(1).foreach { row =>
builder ++= headerDivider(columnWidths)
builder ++= line(row, columnWidths)
}
data.rows.drop(1).foreach { rowData =>
builder ++= rowDivider(columnWidths)
builder ++= line(rowData, columnWidths)
}
builder ++= bottomDivider(columnWidths)
builder ++= "\n"
println(builder.result())
}
private def determineColumnWidths(data: DataTable): IndexedSeq[Int] = {
val allRows = data.headers +: data.rows
data.headers.indices.map(i => allRows.map(row => row(i).length).max)
}
private def topDivider(columnWidths: Seq[Int]): String =
divider("╔", "═", "╤", "╗", columnWidths)
private def headerDivider(columnWidths: Seq[Int]): String =
divider("╠", "═", "╪", "╣", columnWidths)
private def rowDivider(columnWidths: Seq[Int]): String =
divider("╟", "─", "┼", "╢", columnWidths)
private def bottomDivider(columnWidths: Seq[Int]): String =
divider("╚", "═", "╧", "╝", columnWidths)
private def divider(start: String, line: String, separator: String, end: String, columnWidths: Seq[Int]): String =
columnWidths.map(width => line * (width + 2)).mkString(start, separator, end) + "\n"
private def line(row: DataTable.Row, columnWidths: Seq[Int]): String =
row.zip(columnWidths).map({ case (cell, width) => pad(cell, width) }).mkString("║ ", " │ ", " ║") + "\n"
private def pad(string: String, width: Int): String =
" " * (width - string.length) + string
}

View file

@ -125,7 +125,8 @@ object AkkaDisciplinePlugin extends AutoPlugin {
// https://github.com/akka/akka/issues/26119 // https://github.com/akka/akka/issues/26119
Compile / doc / scalacOptions --= disciplineScalacOptions.toSeq :+ "-Xfatal-warnings", Compile / doc / scalacOptions --= disciplineScalacOptions.toSeq :+ "-Xfatal-warnings",
// having discipline warnings in console is just an annoyance // having discipline warnings in console is just an annoyance
Compile / console / scalacOptions --= disciplineScalacOptions.toSeq) Compile / console / scalacOptions --= disciplineScalacOptions.toSeq,
Test / console / scalacOptions --= disciplineScalacOptions.toSeq)
} else { } else {
// we still need these in opt-out since the annotations are present // we still need these in opt-out since the annotations are present
nowarnSettings ++ Seq(Compile / scalacOptions += "-deprecation") nowarnSettings ++ Seq(Compile / scalacOptions += "-deprecation")

View file

@ -181,6 +181,9 @@ object Dependencies {
val reactiveStreamsTck = "org.reactivestreams" % "reactive-streams-tck" % reactiveStreamsVersion % "test" // CC0 val reactiveStreamsTck = "org.reactivestreams" % "reactive-streams-tck" % reactiveStreamsVersion % "test" // CC0
val protobufRuntime = "com.google.protobuf" % "protobuf-java" % protobufJavaVersion % "test" val protobufRuntime = "com.google.protobuf" % "protobuf-java" % protobufJavaVersion % "test"
// YCSB (Yahoo Cloud Serving Benchmark https://ycsb.site)
val ycsb = "site.ycsb" % "core" % "0.17.0" % "test" // ApacheV2
} }
object Provided { object Provided {
@ -262,7 +265,8 @@ object Dependencies {
Provided.levelDBNative, Provided.levelDBNative,
Test.junit, Test.junit,
Test.scalatest.value, Test.scalatest.value,
Test.commonsIo) Test.commonsIo,
Test.ycsb)
val clusterMetrics = l ++= Seq( val clusterMetrics = l ++= Seq(
Provided.sigarLoader, Provided.sigarLoader,