diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/delivery/DeliveryThroughputSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/delivery/DeliveryThroughputSpec.scala new file mode 100644 index 0000000000..7346a64c9a --- /dev/null +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/delivery/DeliveryThroughputSpec.scala @@ -0,0 +1,489 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed.delivery + +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit.NANOSECONDS + +import scala.concurrent.duration._ + +import akka.actor.ActorIdentity +import akka.actor.Identify +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.PostStop +import akka.actor.typed.delivery.ConsumerController +import akka.actor.typed.delivery.ProducerController +import akka.actor.typed.delivery.WorkPullingProducerController +import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter._ +import akka.cluster.MultiNodeClusterSpec +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.Entity +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.typed.MultiNodeTypedClusterSpec +import akka.remote.artery.BenchmarkFileReporter +import akka.remote.artery.PlotResult +import akka.remote.artery.TestRateReporter +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.PerfFlamesSupport +import akka.serialization.jackson.CborSerializable +import com.typesafe.config.ConfigFactory + +object DeliveryThroughputSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + val barrierTimeout = 5.minutes + + val cfg = ConfigFactory.parseString(s""" + # for serious measurements you should increase the totalMessagesFactor (30) + akka.test.DeliveryThroughputSpec.totalMessagesFactor = 10.0 + akka.reliable-delivery { + consumer-controller.flow-control-window = 50 + sharding.consumer-controller.flow-control-window = 50 + sharding.producer-controller.cleanup-unused-after = 5s + } + akka { + loglevel = INFO + log-dead-letters = off + testconductor.barrier-timeout = ${barrierTimeout.toSeconds}s + actor.allow-java-serialization = off + # quicker dissemination the service keys + cluster.typed.receptionist.distributed-data.write-consistency = all + cluster.sharding.passivate-idle-entity-after = 5s + } + """) + + commonConfig(debugConfig(on = false).withFallback(cfg).withFallback(MultiNodeClusterSpec.clusterConfig)) + + nodeConfig(second, third) { + ConfigFactory.parseString(""" + akka.cluster.roles = ["worker"] + """) + } + + lazy val reporterExecutor = Executors.newFixedThreadPool(10) + def newRateReporter(name: String): TestRateReporter = { + val r = new TestRateReporter(name) + reporterExecutor.execute(r) + r + } + + object Consumer { + trait Command + + case object TheMessage extends Command with CborSerializable + case object Stop extends Command + + private case class WrappedDelivery(d: ConsumerController.Delivery[Command]) extends Command + + def apply(consumerController: ActorRef[ConsumerController.Start[Command]]): Behavior[Command] = { + Behaviors.setup { context => + val traceEnabled = context.log.isTraceEnabled + val deliveryAdapter = + context.messageAdapter[ConsumerController.Delivery[Command]](WrappedDelivery(_)) + consumerController ! ConsumerController.Start(deliveryAdapter) + val rateReporter = newRateReporter(context.self.path.elements.mkString("/", "/", "")) + + var c = 0L + + def report(): Unit = { + rateReporter.onMessage(1, 0) // not using the payload size + c += 1 + } + + Behaviors + .receiveMessagePartial[Command] { + case WrappedDelivery(d @ ConsumerController.Delivery(_, confirmTo)) => + report() + if (traceEnabled) + context.log.trace("Processed {}", d.seqNr) + confirmTo ! ConsumerController.Confirmed + Behaviors.same + case Stop => + Behaviors.stopped + } + .receiveSignal { + case (_, PostStop) => + rateReporter.halt() + Behaviors.same + } + + } + + } + } + + object Producer { + trait Command + + case object Run extends Command + private case class WrappedRequestNext(r: ProducerController.RequestNext[Consumer.Command]) extends Command + + def apply( + producerController: ActorRef[ProducerController.Command[Consumer.Command]], + testSettings: TestSettings, + plotRef: ActorRef[PlotResult], + reporter: BenchmarkFileReporter): Behavior[Command] = { + val numberOfMessages = testSettings.totalMessages + + Behaviors.setup { context => + val requestNextAdapter = + context.messageAdapter[ProducerController.RequestNext[Consumer.Command]](WrappedRequestNext(_)) + var startTime = System.nanoTime() + + Behaviors.receiveMessage { + case WrappedRequestNext(next) => + if (next.confirmedSeqNr >= numberOfMessages) { + context.log.info("Completed {} messages", numberOfMessages) + reportEnd(startTime, testSettings, plotRef, reporter) + Behaviors.stopped + } else { + next.sendNextTo ! Consumer.TheMessage + Behaviors.same + } + + case Run => + context.log.info("Starting {} messages", numberOfMessages) + startTime = System.nanoTime() + producerController ! ProducerController.Start(requestNextAdapter) + Behaviors.same + } + } + } + + def reportEnd( + startTime: Long, + testSettings: TestSettings, + plotRef: ActorRef[PlotResult], + resultReporter: BenchmarkFileReporter): Unit = { + val numberOfMessages = testSettings.totalMessages + val took = NANOSECONDS.toMillis(System.nanoTime - startTime) + val throughput = (numberOfMessages * 1000.0 / took) + + resultReporter.reportResults( + s"=== ${resultReporter.testName} ${testSettings.testName}: " + + f"throughput ${throughput}%,.0f msg/s, " + + s"$took ms to deliver $numberOfMessages.") + + plotRef ! PlotResult().add(testSettings.testName, throughput) + } + } + + def serviceKey(testName: String) = ServiceKey[ConsumerController.Command[Consumer.Command]](testName) + + object WorkPullingProducer { + trait Command + + case object Run extends Command + private case class WrappedRequestNext(r: WorkPullingProducerController.RequestNext[Consumer.Command]) + extends Command + + def apply( + producerController: ActorRef[WorkPullingProducerController.Command[Consumer.Command]], + testSettings: TestSettings, + plotRef: ActorRef[PlotResult], + resultReporter: BenchmarkFileReporter): Behavior[Command] = { + val numberOfMessages = testSettings.totalMessages + + Behaviors.setup { context => + val requestNextAdapter = + context.messageAdapter[WorkPullingProducerController.RequestNext[Consumer.Command]](WrappedRequestNext(_)) + var startTime = System.nanoTime() + var remaining = numberOfMessages + context.system.settings.config + .getInt("akka.reliable-delivery.consumer-controller.flow-control-window") + + Behaviors.receiveMessage { + case WrappedRequestNext(next) => + remaining -= 1 + if (remaining == 0) { + context.log.info("Completed {} messages", numberOfMessages) + Producer.reportEnd(startTime, testSettings, plotRef, resultReporter) + Behaviors.stopped + } else { + next.sendNextTo ! Consumer.TheMessage + Behaviors.same + } + + case Run => + context.log.info("Starting {} messages", numberOfMessages) + startTime = System.nanoTime() + producerController ! WorkPullingProducerController.Start(requestNextAdapter) + Behaviors.same + } + } + } + } + + def typeKey(testName: String) = EntityTypeKey[ConsumerController.SequencedMessage[Consumer.Command]](testName) + + object ShardingProducer { + trait Command + + case object Run extends Command + private case class WrappedRequestNext(r: ShardingProducerController.RequestNext[Consumer.Command]) extends Command + + def apply( + producerController: ActorRef[ShardingProducerController.Command[Consumer.Command]], + testSettings: TestSettings, + plotRef: ActorRef[PlotResult], + resultReporter: BenchmarkFileReporter): Behavior[Command] = { + val numberOfMessages = testSettings.totalMessages + + Behaviors.setup { context => + val requestNextAdapter = + context.messageAdapter[ShardingProducerController.RequestNext[Consumer.Command]](WrappedRequestNext(_)) + var startTime = System.nanoTime() + var remaining = numberOfMessages + context.system.settings.config + .getInt("akka.reliable-delivery.sharding.consumer-controller.flow-control-window") + + Behaviors.receiveMessage { + case WrappedRequestNext(next) => + remaining -= 1 + if (remaining == 0) { + context.log.info("Completed {} messages", numberOfMessages) + Producer.reportEnd(startTime, testSettings, plotRef, resultReporter) + Behaviors.stopped + } else { + val entityId = (remaining % testSettings.numberOfConsumers).toString + if (next.entitiesWithDemand(entityId) || !next.bufferedForEntitiesWithoutDemand.contains(entityId)) + next.sendNextTo ! ShardingEnvelope(entityId, Consumer.TheMessage) + Behaviors.same + } + + case Run => + context.log.info("Starting {} messages", numberOfMessages) + startTime = System.nanoTime() + producerController ! ShardingProducerController.Start(requestNextAdapter) + Behaviors.same + } + } + } + } + + final case class TestSettings(testName: String, totalMessages: Long, numberOfConsumers: Int) + +} + +class DeliveryThroughputSpecMultiJvmNode1 extends DeliveryThroughputSpec +class DeliveryThroughputSpecMultiJvmNode2 extends DeliveryThroughputSpec +class DeliveryThroughputSpecMultiJvmNode3 extends DeliveryThroughputSpec + +abstract class DeliveryThroughputSpec + extends MultiNodeSpec(DeliveryThroughputSpec) + with MultiNodeTypedClusterSpec + with PerfFlamesSupport { + + import DeliveryThroughputSpec._ + + private val totalMessagesFactor = + system.settings.config.getDouble("akka.test.DeliveryThroughputSpec.totalMessagesFactor") + + private var plot = PlotResult() + + private def adjustedTotalMessages(n: Long): Long = (n * totalMessagesFactor).toLong + + override def initialParticipants = roles.size + + override def afterAll(): Unit = { + reporterExecutor.shutdown() + runOn(first) { + println(plot.csv(system.name)) + } + super.afterAll() + } + + def identify(name: String, r: RoleName): akka.actor.ActorRef = { + val sel = system.actorSelection(node(r) / "user" / name) + sel.tell(Identify(None), testActor) + expectMsgType[ActorIdentity](10.seconds).ref.get + } + + private val settingsToReport = List( + "akka.test.DeliveryThroughputSpec.totalMessagesFactor", + "akka.reliable-delivery.consumer-controller.flow-control-window") + private val resultReporter = BenchmarkFileReporter("DeliveryThroughputSpec", system, settingsToReport) + + def testPointToPoint(testSettings: TestSettings): Unit = { + import testSettings._ + + runPerfFlames(first, second)(delay = 5.seconds) + + runOn(second) { + val consumerController = system.spawn(ConsumerController[Consumer.Command](), s"consumerController-$testName") + val consumer = system.spawn(Consumer(consumerController), s"consumer-$testName") + enterBarrier(testName + "-consumer-started") + enterBarrier(testName + "-done") + consumer ! Consumer.Stop + } + + runOn(first) { + enterBarrier(testName + "-consumer-started") + val consumerController = + identify(s"consumerController-$testName", second).toTyped[ConsumerController.Command[Consumer.Command]] + val plotProbe = TestProbe[PlotResult]() + val producerController = system.spawn( + ProducerController[Consumer.Command](testName, durableQueueBehavior = None), + s"producerController-$testName") + val producer = + system.spawn(Producer(producerController, testSettings, plotProbe.ref, resultReporter), s"producer-$testName") + producerController ! ProducerController.RegisterConsumer(consumerController) + producer ! Producer.Run + val terminationProbe = TestProbe() + terminationProbe.expectTerminated(producer, 1.minute) + val plotResult = plotProbe.receiveMessage() + plot = plot.addAll(plotResult) + enterBarrier(testName + "-done") + } + + runOn(third) { + enterBarrier(testName + "-consumer-started") + enterBarrier(testName + "-done") + } + + enterBarrier("after-" + testName) + } + + def testWorkPulling(testSettings: TestSettings): Unit = { + import testSettings._ + + runPerfFlames(first, second, third)(delay = 5.seconds) + + runOn(second, third) { + val range = if (myself == second) (1 to numberOfConsumers by 2) else (2 to numberOfConsumers by 2) + val consumers = range.map { n => + val consumerController = + system.spawn(ConsumerController[Consumer.Command](serviceKey(testName)), s"consumerController-$n-$testName") + system.spawn(Consumer(consumerController), s"consumer-$n-$testName") + } + enterBarrier(testName + "-consumer-started") + enterBarrier(testName + "-done") + consumers.foreach(_ ! Consumer.Stop) + } + + runOn(first) { + enterBarrier(testName + "-consumer-started") + val plotProbe = TestProbe[PlotResult]() + val producerController = system.spawn( + WorkPullingProducerController[Consumer.Command](testName, serviceKey(testName), durableQueueBehavior = None), + s"producerController-$testName") + val producer = + system.spawn( + WorkPullingProducer(producerController, testSettings, plotProbe.ref, resultReporter), + s"producer-$testName") + producer ! WorkPullingProducer.Run + val terminationProbe = TestProbe() + terminationProbe.expectTerminated(producer, 1.minute) + val plotResult = plotProbe.receiveMessage() + plot = plot.addAll(plotResult) + enterBarrier(testName + "-done") + } + + enterBarrier("after-" + testName) + } + + def testSharding(testSettings: TestSettings): Unit = { + import testSettings._ + + runPerfFlames(first, second, third)(delay = 5.seconds) + + val region = ClusterSharding(typedSystem).init(Entity(typeKey(testName))(_ => + ShardingConsumerController(consumerController => Consumer(consumerController))).withRole("worker")) + enterBarrier(testName + "-sharding-init") + + runOn(first) { + val plotProbe = TestProbe[PlotResult]() + val producerController = system.spawn( + ShardingProducerController[Consumer.Command](testName, region, durableQueueBehavior = None), + s"producerController-$testName") + val producer = + system.spawn( + ShardingProducer(producerController, testSettings, plotProbe.ref, resultReporter), + s"producer-$testName") + producer ! ShardingProducer.Run + val terminationProbe = TestProbe() + terminationProbe.expectTerminated(producer, 1.minute) + val plotResult = plotProbe.receiveMessage() + plot = plot.addAll(plotResult) + } + + enterBarrier("after-" + testName) + } + + "Reliable delivery throughput" must { + + "form cluster" in { + formCluster(first, second, third) + } + + "warmup point-to-point" in { + val testSettings = TestSettings("warmup-point-to-point", adjustedTotalMessages(20000), 1) + testPointToPoint(testSettings) + } + + "be measured for point-to-point" in { + val testSettings = TestSettings("1-to-1", adjustedTotalMessages(50000), 1) + testPointToPoint(testSettings) + } + + "warmup work-pulling" in { + val testSettings = TestSettings("warmup-work-pulling", adjustedTotalMessages(10000), 2) + testWorkPulling(testSettings) + } + + "be measured for work-pulling with 1 worker" in { + val testSettings = TestSettings("work-pulling-1", adjustedTotalMessages(20000), 1) + testWorkPulling(testSettings) + } + + "be measured for work-pulling with 2 workers" in { + val testSettings = TestSettings("work-pulling-2", adjustedTotalMessages(30000), 2) + testWorkPulling(testSettings) + } + + "be measured for work-pulling with 4 workers" in { + val testSettings = TestSettings("work-pulling-4", adjustedTotalMessages(40000), 4) + testWorkPulling(testSettings) + } + + "be measured for work-pulling with 10 workers" in { + val testSettings = TestSettings("work-pulling-20", adjustedTotalMessages(40000), 10) + testWorkPulling(testSettings) + } + + "warmup sharding" in { + val testSettings = TestSettings("warmup-sharding", adjustedTotalMessages(10000), 2) + testSharding(testSettings) + } + + "be measured for sharding with 1 entity" in { + val testSettings = TestSettings("sharding-1", adjustedTotalMessages(20000), 1) + testSharding(testSettings) + } + + "be measured for sharding with 2 entities" in { + val testSettings = TestSettings("sharding-2", adjustedTotalMessages(20000), 2) + testSharding(testSettings) + } + + "be measured for sharding with 4 entities" in { + val testSettings = TestSettings("sharding-4", adjustedTotalMessages(20000), 4) + testSharding(testSettings) + } + + "be measured for sharding with 10 entities" in { + val testSettings = TestSettings("sharding-10", adjustedTotalMessages(20000), 10) + testSharding(testSettings) + } + + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/BenchmarkFileReporter.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/BenchmarkFileReporter.scala index 0b08e157d3..4fe827f766 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/BenchmarkFileReporter.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/BenchmarkFileReporter.scala @@ -33,7 +33,27 @@ object BenchmarkFileReporter { val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss") - def apply(test: String, system: ActorSystem): BenchmarkFileReporter = + def apply(test: String, system: ActorSystem): BenchmarkFileReporter = { + val settingsToReport = + Seq( + "akka.test.MaxThroughputSpec.totalMessagesFactor", + "akka.test.MaxThroughputSpec.real-message", + "akka.test.LatencySpec.totalMessagesFactor", + "akka.test.LatencySpec.repeatCount", + "akka.test.LatencySpec.real-message", + "akka.remote.artery.enabled", + "akka.remote.artery.advanced.inbound-lanes", + "akka.remote.artery.advanced.buffer-pool-size", + "akka.remote.artery.advanced.aeron.idle-cpu-level", + "akka.remote.artery.advanced.aeron.embedded-media-driver", + "akka.remote.default-remote-dispatcher.throughput", + "akka.remote.default-remote-dispatcher.fork-join-executor.parallelism-factor", + "akka.remote.default-remote-dispatcher.fork-join-executor.parallelism-min", + "akka.remote.default-remote-dispatcher.fork-join-executor.parallelism-max") + apply(test, system, settingsToReport) + } + + def apply(test: String, system: ActorSystem, settingsToReport: Seq[String]): BenchmarkFileReporter = new BenchmarkFileReporter { override val testName = test @@ -51,22 +71,6 @@ object BenchmarkFileReporter { val fos = Files.newOutputStream(testResultFile.toPath) reportResults(s"Git commit: $gitCommit") - val settingsToReport = - Seq( - "akka.test.MaxThroughputSpec.totalMessagesFactor", - "akka.test.MaxThroughputSpec.real-message", - "akka.test.LatencySpec.totalMessagesFactor", - "akka.test.LatencySpec.repeatCount", - "akka.test.LatencySpec.real-message", - "akka.remote.artery.enabled", - "akka.remote.artery.advanced.inbound-lanes", - "akka.remote.artery.advanced.buffer-pool-size", - "akka.remote.artery.advanced.aeron.idle-cpu-level", - "akka.remote.artery.advanced.aeron.embedded-media-driver", - "akka.remote.default-remote-dispatcher.throughput", - "akka.remote.default-remote-dispatcher.fork-join-executor.parallelism-factor", - "akka.remote.default-remote-dispatcher.fork-join-executor.parallelism-min", - "akka.remote.default-remote-dispatcher.fork-join-executor.parallelism-max") settingsToReport.foreach(reportSetting) def reportResults(result: String): Unit = synchronized { diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TestRateReporter.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TestRateReporter.scala index 17a60dbb65..a7fd631664 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TestRateReporter.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TestRateReporter.scala @@ -13,9 +13,16 @@ class TestRateReporter(name: String) bytesPerSec: Double, totalMessages: Long, totalBytes: Long): Unit = { - println( - name + - f": ${messagesPerSec}%,.0f msgs/sec, ${bytesPerSec}%,.0f bytes/sec, " + - f"totals ${totalMessages}%,d messages ${totalBytes / (1024 * 1024)}%,d MB") + if (totalBytes > 0) { + println( + name + + f": ${messagesPerSec}%,.0f msgs/sec, ${bytesPerSec}%,.0f bytes/sec, " + + f"totals ${totalMessages}%,d messages ${totalBytes / (1024 * 1024)}%,d MB") + } else { + println( + name + + f": ${messagesPerSec}%,.0f msgs/sec " + + f"total ${totalMessages}%,d messages") + } } }) {} diff --git a/build.sbt b/build.sbt index e2733c5b2d..c39ea9f69e 100644 --- a/build.sbt +++ b/build.sbt @@ -447,6 +447,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") persistenceTyped % "test->test", remote % "compile->CompileJdk9;test->test", remoteTests % "test->test", + remoteTests % "test->test;multi-jvm->multi-jvm", jackson % "test->test") .settings(javacOptions += "-parameters") // for Jackson .settings(AutomaticModuleName.settings("akka.cluster.sharding.typed"))