diff --git a/akka-bench-jmh/src/main/scala/akka/actor/typed/delivery/ReliableDeliveryBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/typed/delivery/ReliableDeliveryBenchmark.scala index 1792a756f1..ed5ed75a72 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/typed/delivery/ReliableDeliveryBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/typed/delivery/ReliableDeliveryBenchmark.scala @@ -9,11 +9,15 @@ import java.util.concurrent.TimeUnit import scala.concurrent.Await import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success import akka.Done import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior +import akka.actor.typed.delivery.ProducerController.MessageWithConfirmation +import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors import com.typesafe.config.ConfigFactory @@ -24,9 +28,13 @@ object Producer { case object Run extends Command private case class WrappedRequestNext(r: ProducerController.RequestNext[Consumer.Command]) extends Command + private case object AskReply extends Command + + private implicit val askTimeout: akka.util.Timeout = 5.seconds def apply( numberOfMessages: Int, + useAsk: Boolean, producerController: ActorRef[ProducerController.Command[Consumer.Command]]): Behavior[Command] = { Behaviors.setup { context => val requestNextAdapter = @@ -37,6 +45,14 @@ object Producer { if (next.confirmedSeqNr >= numberOfMessages) { context.log.info("Completed {} messages", numberOfMessages) Behaviors.stopped + } else if (useAsk) { + context.ask[MessageWithConfirmation[Consumer.Command], ProducerController.SeqNr]( + next.askNextTo, + askReplyTo => MessageWithConfirmation(Consumer.TheMessage, askReplyTo)) { + case Success(_) => AskReply + case Failure(e) => throw e + } + Behaviors.same } else { next.sendNextTo ! Consumer.TheMessage Behaviors.same @@ -46,6 +62,9 @@ object Producer { context.log.info("Starting {} messages", numberOfMessages) producerController ! ProducerController.Start(requestNextAdapter) Behaviors.same + + case AskReply => + Behaviors.same } } } @@ -58,6 +77,9 @@ object Consumer { private case class WrappedDelivery(d: ConsumerController.Delivery[Command]) extends Command + def serviceKey(testName: String): ServiceKey[ConsumerController.Command[Command]] = + ServiceKey[ConsumerController.Command[Consumer.Command]](testName) + def apply(consumerController: ActorRef[ConsumerController.Command[Command]]): Behavior[Command] = { Behaviors.setup { context => val deliveryAdapter = @@ -74,33 +96,88 @@ object Consumer { } } +object WorkPullingProducer { + trait Command + + case object Run extends Command + private case class WrappedRequestNext(r: WorkPullingProducerController.RequestNext[Consumer.Command]) extends Command + + def apply( + numberOfMessages: Int, + producerController: ActorRef[WorkPullingProducerController.Command[Consumer.Command]]): Behavior[Command] = { + Behaviors.setup { context => + val requestNextAdapter = + context.messageAdapter[WorkPullingProducerController.RequestNext[Consumer.Command]](WrappedRequestNext(_)) + var remaining = numberOfMessages + context.system.settings.config + .getInt("akka.reliable-delivery.consumer-controller.flow-control-window") + + Behaviors.receiveMessage { + case WrappedRequestNext(next) => + remaining -= 1 + if (remaining == 0) { + context.log.info("Completed {} messages", numberOfMessages) + Behaviors.stopped + } else { + next.sendNextTo ! Consumer.TheMessage + Behaviors.same + } + + case Run => + context.log.info("Starting {} messages", numberOfMessages) + producerController ! WorkPullingProducerController.Start(requestNextAdapter) + Behaviors.same + } + } + } +} + object Guardian { trait Command - final case class Run(id: String, numberOfMessages: Int, replyTo: ActorRef[Done]) extends Command - final case class ProducerTerminated(consumer: ActorRef[Consumer.Command], replyTo: ActorRef[Done]) extends Command + final case class RunPointToPoint(id: String, numberOfMessages: Int, useAsk: Boolean, replyTo: ActorRef[Done]) + extends Command + final case class RunWorkPulling(id: String, numberOfMessages: Int, workers: Int, replyTo: ActorRef[Done]) + extends Command + final case class ProducerTerminated(consumers: List[ActorRef[Consumer.Command]], replyTo: ActorRef[Done]) + extends Command def apply(): Behavior[Command] = { Behaviors.setup { context => Behaviors.receiveMessage { - case Run(id, numberOfMessages, replyTo) => - val consumerController = context.spawn(ConsumerController[Consumer.Command](), s"consumerController-$id") - val consumer = context.spawn(Consumer(consumerController), s"consumer-$id") + case RunPointToPoint(id, numberOfMessages, useAsk, replyTo) => + // point-to-point + val consumerController = + context.spawn(ConsumerController[Consumer.Command](), s"consumerController-$id") + val consumers = List(context.spawn(Consumer(consumerController), s"consumer-$id")) val producerController = context.spawn( ProducerController[Consumer.Command](id, durableQueueBehavior = None), s"producerController-$id") - val producer = context.spawn(Producer(numberOfMessages, producerController), s"producer-$id") - context.watchWith(producer, ProducerTerminated(consumer, replyTo)) - + val producer = context.spawn(Producer(numberOfMessages, useAsk, producerController), s"producer-$id") consumerController ! ConsumerController.RegisterToProducerController(producerController) - + context.watchWith(producer, ProducerTerminated(consumers, replyTo)) producer ! Producer.Run - Behaviors.same - case ProducerTerminated(consumer, replyTo) => - context.stop(consumer) + case RunWorkPulling(id, numberOfMessages, workers, replyTo) => + // workPulling + val sKey = Consumer.serviceKey(id) + val consumerController = + context.spawn(ConsumerController[Consumer.Command](sKey), s"consumerController-$id") + val consumers = (1 to workers).map { n => + context.spawn(Consumer(consumerController), s"consumer-$n-$id") + }.toList + + val producerController = context.spawn( + WorkPullingProducerController[Consumer.Command](id, sKey, durableQueueBehavior = None), + s"producerController-$id") + val producer = context.spawn(WorkPullingProducer(numberOfMessages, producerController), s"producer-$id") + context.watchWith(producer, ProducerTerminated(consumers, replyTo)) + producer ! WorkPullingProducer.Run + Behaviors.same + + case ProducerTerminated(consumers, replyTo) => + consumers.foreach(context.stop) replyTo ! Done Behaviors.same } @@ -122,7 +199,7 @@ object ReliableDeliveryBenchmark { class ReliableDeliveryBenchmark { import ReliableDeliveryBenchmark._ - @Param(Array("10", "50")) + @Param(Array("50")) var window = 0 implicit var system: ActorSystem[Guardian.Command] = _ @@ -150,8 +227,36 @@ class ReliableDeliveryBenchmark { @Benchmark @OperationsPerInvocation(messagesPerOperation) - def echo(): Unit = { - Await.result(system.ask(Guardian.Run(UUID.randomUUID().toString, messagesPerOperation, _)), timeout) + def pointToPoint(): Unit = { + Await.result( + system.ask( + Guardian.RunPointToPoint(s"point-to-point-${UUID.randomUUID()}", messagesPerOperation, useAsk = false, _)), + timeout) + } + + @Benchmark + @OperationsPerInvocation(messagesPerOperation) + def pointToPointAsk(): Unit = { + Await.result( + system.ask( + Guardian.RunPointToPoint(s"point-to-point-${UUID.randomUUID()}", messagesPerOperation, useAsk = true, _)), + timeout) + } + + @Benchmark + @OperationsPerInvocation(messagesPerOperation) + def workPulling1(): Unit = { + Await.result( + system.ask(Guardian.RunWorkPulling(s"work-pulling-${UUID.randomUUID()}", messagesPerOperation, workers = 1, _)), + timeout) + } + + @Benchmark + @OperationsPerInvocation(messagesPerOperation) + def workPulling2(): Unit = { + Await.result( + system.ask(Guardian.RunWorkPulling(s"work-pulling-${UUID.randomUUID()}", messagesPerOperation, workers = 2, _)), + timeout) } }