diff --git a/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/Events.scala b/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/Events.scala new file mode 100644 index 0000000000..c6545507dd --- /dev/null +++ b/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/Events.scala @@ -0,0 +1,161 @@ +/* + * Copyright (C) extends Event 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.internal.jfr + +import akka.annotation.InternalApi +import jdk.jfr.Category +import jdk.jfr.Enabled +import jdk.jfr.Event +import jdk.jfr.Label +import jdk.jfr.StackTrace + +// requires jdk9+ to compile +// for editing these in IntelliJ, open module settings, change JDK dependency to 11 for only this module + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController created") +final class DeliveryProducerCreated(val producerId: String, val actorPath: String) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController started") +final class DeliveryProducerStarted(val producerId: String, val actorPath: String) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController sent RequestNext") +final class DeliveryProducerRequestNext(val producerId: String, val currentSeqNr: Long, val confirmedSeqNr: Long) + extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController sent SequencedMessage") +final class DeliveryProducerSent(val producerId: String, val seqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController waiting for demand") +final class DeliveryProducerWaitingForRequest(val producerId: String, val currentSeqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController resent unconfirmed") +final class DeliveryProducerResentUnconfirmed(val producerId: String, val fromSeqNr: Long, val toSeqNr: Long) + extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController resent first") +final class DeliveryProducerResentFirst(val producerId: String, val firstSeqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ProducerController")) @Label( + "Delivery ProducerController resent first unconfirmed") +final class DeliveryProducerResentFirstUnconfirmed(val producerId: String, val seqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController received message") +final class DeliveryProducerReceived(val producerId: String, val currentSeqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController received demand request") +final class DeliveryProducerReceivedRequest(val producerId: String, val requestedSeqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ProducerController")) @Label("Delivery ProducerController received resend request") +final class DeliveryProducerReceivedResend(val producerId: String, val fromSeqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController created") +final class DeliveryConsumerCreated(val actorPath: String) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController started") +final class DeliveryConsumerStarted(val actorPath: String) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received") +final class DeliveryConsumerReceived(val producerId: String, val seqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ConsumerController")) @Label( + "Delivery ConsumerController received, previous in progress") +final class DeliveryConsumerReceivedPreviousInProgress(val producerId: String, val seqNr: Long, val stashed: Int) + extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received duplicate") +final class DeliveryConsumerDuplicate(val pid: String, val expectedSeqNr: Long, val seqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController received missing") +final class DeliveryConsumerMissing(val pid: String, val expectedSeqNr: Long, val seqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ConsumerController")) @Label( + "Delivery ConsumerController received expected resend") +final class DeliveryConsumerReceivedResend(val seqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController sent demand Request") +final class DeliveryConsumerSentRequest(val producerId: String, val requestedSeqNr: Long) extends Event + +/** INTERNAL API */ +@InternalApi +@Enabled(true) +@StackTrace(false) +@Category(Array("Akka", "Delivery", "ConsumerController")) @Label("Delivery ConsumerController producer changed") +final class DeliveryConsumerChangedProducer(val producerId: String) extends Event diff --git a/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/JFRActorFlightRecorder.scala b/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/JFRActorFlightRecorder.scala new file mode 100644 index 0000000000..deb9244ee3 --- /dev/null +++ b/akka-actor-typed/src/main/scala-jdk-9/akka/actor/typed/internal/jfr/JFRActorFlightRecorder.scala @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.actor.typed.internal.jfr + +import akka.actor.ActorPath +import akka.actor.typed.ActorSystem +import akka.actor.typed.internal.ActorFlightRecorder +import akka.actor.typed.internal.DeliveryFlightRecorder +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class JFRActorFlightRecorder(val system: ActorSystem[_]) extends ActorFlightRecorder { + override val delivery: DeliveryFlightRecorder = new JFRDeliveryFlightRecorder +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class JFRDeliveryFlightRecorder extends DeliveryFlightRecorder { + + override def producerCreated(producerId: String, path: ActorPath): Unit = + new DeliveryProducerCreated(producerId, path.toString).commit() + override def producerStarted(producerId: String, path: ActorPath): Unit = + new DeliveryProducerStarted(producerId, path.toString).commit() + override def producerRequestNext(producerId: String, currentSeqNr: Long, confirmedSeqNr: Long): Unit = + new DeliveryProducerRequestNext(producerId, currentSeqNr, confirmedSeqNr).commit() + override def producerSent(producerId: String, seqNr: Long): Unit = + new DeliveryProducerSent(producerId, seqNr).commit() + override def producerWaitingForRequest(producerId: String, currentSeqNr: Long): Unit = + new DeliveryProducerWaitingForRequest(producerId, currentSeqNr).commit() + override def producerResentUnconfirmed(producerId: String, fromSeqNr: Long, toSeqNr: Long): Unit = + new DeliveryProducerResentUnconfirmed(producerId, fromSeqNr, toSeqNr).commit() + override def producerResentFirst(producerId: String, firstSeqNr: Long): Unit = + new DeliveryProducerResentFirst(producerId, firstSeqNr).commit() + override def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit = + new DeliveryProducerResentFirstUnconfirmed(producerId, seqNr).commit() + override def producerReceived(producerId: String, currentSeqNr: Long): Unit = + new DeliveryProducerReceived(producerId, currentSeqNr).commit() + override def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit = + new DeliveryProducerReceivedRequest(producerId, requestedSeqNr).commit() + override def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit = + new DeliveryProducerReceivedResend(producerId, fromSeqNr).commit() + + override def consumerCreated(path: ActorPath): Unit = + new DeliveryConsumerCreated(path.toString).commit() + override def consumerStarted(path: ActorPath): Unit = + new DeliveryConsumerStarted(path.toString).commit() + override def consumerReceived(producerId: String, seqNr: Long): Unit = + new DeliveryConsumerReceived(producerId, seqNr).commit() + override def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit = + new DeliveryConsumerReceivedPreviousInProgress(producerId, seqNr: Long, stashed).commit() + override def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = + new DeliveryConsumerDuplicate(pid, expectedSeqNr, seqNr).commit() + override def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = + new DeliveryConsumerMissing(pid, expectedSeqNr, seqNr).commit() + override def consumerReceivedResend(seqNr: Long): Unit = + new DeliveryConsumerReceivedResend(seqNr).commit() + override def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit = + new DeliveryConsumerSentRequest(producerId, requestedSeqNr).commit() + override def consumerChangedProducer(producerId: String): Unit = + new DeliveryConsumerChangedProducer(producerId).commit() + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala index 80c2b08cac..9765872df0 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala @@ -10,6 +10,7 @@ import akka.actor.typed.PostStop import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.ConsumerController.DeliverThenStop import akka.actor.typed.delivery.ProducerController +import akka.actor.typed.internal.ActorFlightRecorder import akka.actor.typed.receptionist.Receptionist import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.scaladsl.ActorContext @@ -77,6 +78,7 @@ import akka.annotation.InternalApi private final case class State[A]( producerController: ActorRef[ProducerControllerImpl.InternalCommand], + producerId: String, consumer: ActorRef[ConsumerController.Delivery[A]], receivedSeqNr: SeqNr, confirmedSeqNr: SeqNr, @@ -104,6 +106,8 @@ import akka.annotation.InternalApi Behaviors .withStash[InternalCommand](settings.flowControlWindow) { stashBuffer => Behaviors.setup { context => + val flightRecorder = ActorFlightRecorder(context.system).delivery + flightRecorder.consumerCreated(context.self.path) Behaviors.withMdc(msg => mdcForMessage(msg)) { context.setLoggerName("akka.actor.typed.delivery.ConsumerController") serviceKey.foreach { key => @@ -122,6 +126,7 @@ import akka.annotation.InternalApi ConsumerControllerImpl.enforceLocalConsumer(s.deliverTo) context.watchWith(s.deliverTo, ConsumerTerminated(s.deliverTo)) + flightRecorder.consumerStarted(context.self.path) val activeBehavior = new ConsumerControllerImpl[A](context, timers, stashBuffer, settings) .active(initialState(context, s, registering)) @@ -177,6 +182,7 @@ import akka.annotation.InternalApi registering: Option[ActorRef[ProducerController.Command[A]]]): State[A] = { State( producerController = context.system.deadLetters, + "n/a", start.deliverTo, receivedSeqNr = 0, confirmedSeqNr = 0, @@ -208,6 +214,8 @@ private class ConsumerControllerImpl[A]( import ProducerControllerImpl.Resend import settings.flowControlWindow + private val flightRecorder = ActorFlightRecorder(context.system).delivery + startRetryTimer() private def resendLost = !settings.onlyFlowControl @@ -222,6 +230,8 @@ private class ConsumerControllerImpl[A]( val seqNr = seqMsg.seqNr val expectedSeqNr = s.receivedSeqNr + 1 + flightRecorder.consumerReceived(pid, seqNr) + if (s.isProducerChanged(seqMsg)) { if (seqMsg.first) context.log.trace("Received first SequencedMessage seqNr [{}], delivering to consumer.", seqNr) @@ -235,6 +245,7 @@ private class ConsumerControllerImpl[A]( context.log.trace("Received SequencedMessage seqNr [{}], delivering to consumer.", seqNr) deliver(s.copy(receivedSeqNr = seqNr), seqMsg) } else if (seqNr > expectedSeqNr) { + flightRecorder.consumerMissing(pid, expectedSeqNr, seqNr) context.log.debugN( "Received SequencedMessage seqNr [{}], but expected [{}], {}.", seqNr, @@ -248,6 +259,7 @@ private class ConsumerControllerImpl[A]( waitingForConfirmation(s.copy(receivedSeqNr = seqNr), seqMsg) } } else { // seqNr < expectedSeqNr + flightRecorder.consumerDuplicate(pid, expectedSeqNr, seqNr) context.log.debug2("Received duplicate SequencedMessage seqNr [{}], expected [{}].", seqNr, expectedSeqNr) if (seqMsg.first) active(retryRequest(s)) @@ -294,6 +306,7 @@ private class ConsumerControllerImpl[A]( deliver( s.copy( producerController = seqMsg.producerController, + producerId = seqMsg.producerId, receivedSeqNr = seqNr, confirmedSeqNr = 0L, requestedSeqNr = newRequestedSeqNr, @@ -327,6 +340,7 @@ private class ConsumerControllerImpl[A]( seqMsg.producerController, seqMsg.seqNr) } else { + flightRecorder.consumerChangedProducer(seqMsg.producerId) context.log.debugN( "Changing ProducerController from [{}] to [{}], seqNr [{}].", s.producerController, @@ -354,6 +368,7 @@ private class ConsumerControllerImpl[A]( seqNr) Behaviors.same } else if (s.isNextExpected(seqMsg)) { + flightRecorder.consumerReceivedResend(seqNr) context.log.debug("Received missing SequencedMessage seqNr [{}].", seqNr) deliver(s.copy(receivedSeqNr = seqNr), seqMsg) } else { @@ -417,6 +432,7 @@ private class ConsumerControllerImpl[A]( if (seqMsg.first) { // confirm the first message immediately to cancel resending of first val newRequestedSeqNr = seqNr - 1 + flowControlWindow + flightRecorder.consumerSentRequest(seqMsg.producerId, newRequestedSeqNr) context.log.debug( "Sending Request after first with confirmedSeqNr [{}], requestUpToSeqNr [{}].", seqNr, @@ -425,6 +441,7 @@ private class ConsumerControllerImpl[A]( newRequestedSeqNr } else if ((s.requestedSeqNr - seqNr) == flowControlWindow / 2) { val newRequestedSeqNr = s.requestedSeqNr + flowControlWindow / 2 + flightRecorder.consumerSentRequest(seqMsg.producerId, newRequestedSeqNr) context.log.debug( "Sending Request with confirmedSeqNr [{}], requestUpToSeqNr [{}].", seqNr, @@ -452,7 +469,9 @@ private class ConsumerControllerImpl[A]( } case msg: SequencedMessage[A] => + flightRecorder.consumerReceivedPreviousInProgress(seqMsg.producerId, seqMsg.seqNr, stashBuffer.size + 1) if (msg.seqNr == seqMsg.seqNr && msg.producerController == seqMsg.producerController) { + flightRecorder.consumerDuplicate(msg.producerId, seqMsg.seqNr + 1, msg.seqNr) context.log.debug("Received duplicate SequencedMessage seqNr [{}].", msg.seqNr) } else if (stashBuffer.isFull) { // possible that the stash is full if ProducerController resends unconfirmed (duplicates) @@ -567,6 +586,7 @@ private class ConsumerControllerImpl[A]( // SequenceMessage are arriving. On the other hand it might be too much overhead to reschedule of each // incoming SequenceMessage. val newRequestedSeqNr = if (resendLost) s.requestedSeqNr else s.receivedSeqNr + flowControlWindow / 2 + flightRecorder.consumerSentRequest(s.producerId, newRequestedSeqNr) context.log.debug( "Retry sending Request with confirmedSeqNr [{}], requestUpToSeqNr [{}].", s.confirmedSeqNr, diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala index 39271b0a42..00e9ff6794 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala @@ -18,6 +18,7 @@ import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.ConsumerController.SequencedMessage import akka.actor.typed.delivery.DurableProducerQueue import akka.actor.typed.delivery.ProducerController +import akka.actor.typed.internal.ActorFlightRecorder import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.LoggerOps @@ -128,6 +129,7 @@ object ProducerControllerImpl { settings: ProducerController.Settings): Behavior[Command[A]] = { Behaviors .setup[InternalCommand] { context => + ActorFlightRecorder(context.system).delivery.producerCreated(producerId, context.self.path) Behaviors.withMdc(staticMdc = Map("producerId" -> producerId)) { context.setLoggerName("akka.actor.typed.delivery.ProducerController") val durableQueue = askLoadState(context, durableQueueBehavior, settings) @@ -161,6 +163,7 @@ object ProducerControllerImpl { send: ConsumerController.SequencedMessage[A] => Unit): Behavior[Command[A]] = { Behaviors .setup[InternalCommand] { context => + ActorFlightRecorder(context.system).delivery.producerCreated(producerId, context.self.path) Behaviors.withMdc(staticMdc = Map("producerId" -> producerId)) { context.setLoggerName("akka.actor.typed.delivery.ProducerController") val durableQueue = askLoadState(context, durableQueueBehavior, settings) @@ -301,10 +304,13 @@ object ProducerControllerImpl { state: State[A]): Behavior[InternalCommand] = { Behaviors.setup { context => + val flightRecorder = ActorFlightRecorder(context.system).delivery + flightRecorder.producerStarted(producerId, context.self.path) Behaviors.withTimers { timers => val msgAdapter: ActorRef[A] = context.messageAdapter(msg => Msg(msg)) val requested = if (state.unconfirmed.isEmpty) { + flightRecorder.producerRequestNext(producerId, 1L, 0) state.producer ! RequestNext(producerId, 1L, 0L, msgAdapter, context.self) true } else { @@ -345,6 +351,7 @@ private class ProducerControllerImpl[A: ClassTag]( import ProducerController.Start import ProducerControllerImpl._ + private val flightRecorder = ActorFlightRecorder(context.system).delivery // for the durableQueue StoreMessageSent ask private implicit val askTimeout: Timeout = settings.durableQueueRequestTimeout @@ -362,11 +369,14 @@ private class ProducerControllerImpl[A: ClassTag]( if (s.currentSeqNr == s.firstSeqNr) timers.startTimerWithFixedDelay(ResendFirst, ResendFirst, 1.second) + flightRecorder.producerSent(producerId, seqMsg.seqNr) s.send(seqMsg) val newRequested = - if (s.currentSeqNr == s.requestedSeqNr) + if (s.currentSeqNr == s.requestedSeqNr) { + flightRecorder.producerWaitingForRequest(producerId, s.currentSeqNr) false - else { + } else { + flightRecorder.producerRequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr) s.producer ! RequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr, msgAdapter, context.self) true } @@ -391,6 +401,7 @@ private class ProducerControllerImpl[A: ClassTag]( newRequestedSeqNr: SeqNr, supportResend: Boolean, viaTimeout: Boolean): Behavior[InternalCommand] = { + flightRecorder.producerReceivedRequest(producerId, newRequestedSeqNr) context.log.debugN( "Received Request, confirmed [{}], requested [{}], current [{}]", newConfirmedSeqNr, @@ -422,8 +433,10 @@ private class ProducerControllerImpl[A: ClassTag]( stateAfterAck.currentSeqNr) if (newRequestedSeqNr2 > s.requestedSeqNr) { - if (!s.requested && (newRequestedSeqNr2 - s.currentSeqNr) > 0) + if (!s.requested && (newRequestedSeqNr2 - s.currentSeqNr) > 0) { + flightRecorder.producerRequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr) s.producer ! RequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr, msgAdapter, context.self) + } active( stateAfterAck.copy( requested = true, @@ -485,6 +498,7 @@ private class ProducerControllerImpl[A: ClassTag]( } def receiveResend(fromSeqNr: SeqNr): Behavior[InternalCommand] = { + flightRecorder.producerReceivedResend(producerId, fromSeqNr) val newUnconfirmed = if (fromSeqNr == 0 && s.unconfirmed.nonEmpty) s.unconfirmed.head.asFirst +: s.unconfirmed.tail @@ -495,13 +509,18 @@ private class ProducerControllerImpl[A: ClassTag]( } def resendUnconfirmed(newUnconfirmed: Vector[SequencedMessage[A]]): Unit = { - if (newUnconfirmed.nonEmpty) - context.log.debug("Resending [{} - {}].", newUnconfirmed.head.seqNr, newUnconfirmed.last.seqNr) - newUnconfirmed.foreach(s.send) + if (newUnconfirmed.nonEmpty) { + val fromSeqNr = newUnconfirmed.head.seqNr + val toSeqNr = newUnconfirmed.last.seqNr + flightRecorder.producerResentUnconfirmed(producerId, fromSeqNr, toSeqNr) + context.log.debug("Resending [{} - {}].", fromSeqNr, toSeqNr) + newUnconfirmed.foreach(s.send) + } } def receiveResendFirstUnconfirmed(): Behavior[InternalCommand] = { if (s.unconfirmed.nonEmpty) { + flightRecorder.producerResentFirstUnconfirmed(producerId, s.unconfirmed.head.seqNr) context.log.debug("Resending first unconfirmed [{}].", s.unconfirmed.head.seqNr) s.send(s.unconfirmed.head) } @@ -510,6 +529,7 @@ private class ProducerControllerImpl[A: ClassTag]( def receiveResendFirst(): Behavior[InternalCommand] = { if (s.unconfirmed.nonEmpty && s.unconfirmed.head.seqNr == s.firstSeqNr) { + flightRecorder.producerResentFirst(producerId, s.firstSeqNr) context.log.debug("Resending first, [{}].", s.firstSeqNr) s.send(s.unconfirmed.head.asFirst) } else { @@ -522,8 +542,10 @@ private class ProducerControllerImpl[A: ClassTag]( def receiveStart(start: Start[A]): Behavior[InternalCommand] = { ProducerControllerImpl.enforceLocalProducer(start.producer) context.log.debug("Register new Producer [{}], currentSeqNr [{}].", start.producer, s.currentSeqNr) - if (s.requested) + if (s.requested) { + flightRecorder.producerRequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr) start.producer ! RequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr, msgAdapter, context.self) + } active(s.copy(producer = start.producer)) } @@ -547,6 +569,7 @@ private class ProducerControllerImpl[A: ClassTag]( Behaviors.receiveMessage { case MessageWithConfirmation(m: A, replyTo) => + flightRecorder.producerReceived(producerId, s.currentSeqNr) val newReplyAfterStore = s.replyAfterStore.updated(s.currentSeqNr, replyTo) if (durableQueue.isEmpty) { onMsg(m, newReplyAfterStore, ack = true) @@ -558,6 +581,7 @@ private class ProducerControllerImpl[A: ClassTag]( } case Msg(m: A) => + flightRecorder.producerReceived(producerId, s.currentSeqNr) if (durableQueue.isEmpty) { onMsg(m, s.replyAfterStore, ack = false) } else { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorFlightRecorder.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorFlightRecorder.scala new file mode 100644 index 0000000000..f841f1bfb2 --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorFlightRecorder.scala @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.actor.typed.internal + +import scala.util.Failure +import scala.util.Success + +import akka.actor.ActorPath +import akka.actor.typed.ActorSystem +import akka.actor.typed.Extension +import akka.actor.typed.ExtensionId +import akka.annotation.InternalApi +import akka.util.JavaVersion + +/** + * INTERNAL API + */ +@InternalApi +object ActorFlightRecorder extends ExtensionId[ActorFlightRecorder] { + + override def createExtension(system: ActorSystem[_]): ActorFlightRecorder = + if (JavaVersion.majorVersion >= 11 && system.settings.config.getBoolean("akka.java-flight-recorder.enabled")) { + // Dynamic instantiation to not trigger class load on earlier JDKs + import scala.language.existentials + system.dynamicAccess.createInstanceFor[ActorFlightRecorder]( + "akka.actor.typed.internal.jfr.JFRActorFlightRecorder", + (classOf[ActorSystem[_]], system) :: Nil) match { + case Success(jfr) => jfr + case Failure(ex) => + system.log.warn("Failed to load JFR Actor flight recorder, falling back to noop. Exception: {}", ex.toString) + NoOpActorFlightRecorder + } // fallback if not possible to dynamically load for some reason + } else + // JFR not available on Java 8 + NoOpActorFlightRecorder +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] trait ActorFlightRecorder extends Extension { + val delivery: DeliveryFlightRecorder + +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] trait DeliveryFlightRecorder { + + def producerCreated(producerId: String, path: ActorPath): Unit + def producerStarted(producerId: String, path: ActorPath): Unit + def producerRequestNext(producerId: String, currentSeqNr: Long, confirmedSeqNr: Long): Unit + def producerSent(producerId: String, seqNr: Long): Unit + def producerWaitingForRequest(producerId: String, currentSeqNr: Long): Unit + def producerResentUnconfirmed(producerId: String, fromSeqNr: Long, toSeqNr: Long): Unit + def producerResentFirst(producerId: String, firstSeqNr: Long): Unit + def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit + def producerReceived(producerId: String, currentSeqNr: Long): Unit + def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit + def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit + + def consumerCreated(path: ActorPath): Unit + def consumerStarted(path: ActorPath): Unit + def consumerReceived(producerId: String, seqNr: Long): Unit + def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit + def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit + def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit + def consumerReceivedResend(seqNr: Long): Unit + def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit + def consumerChangedProducer(producerId: String): Unit +} + +/** + * JFR is only available under certain circumstances (JDK11 for now, possible OpenJDK 8 in the future) so therefore + * the default on JDK 8 needs to be a no-op flight recorder. + * + * INTERNAL + */ +@InternalApi +private[akka] case object NoOpActorFlightRecorder extends ActorFlightRecorder { + override val delivery: DeliveryFlightRecorder = NoOpDeliveryFlightRecorder +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] object NoOpDeliveryFlightRecorder extends DeliveryFlightRecorder { + + override def producerCreated(producerId: String, path: ActorPath): Unit = () + override def producerStarted(producerId: String, path: ActorPath): Unit = () + override def producerRequestNext(producerId: String, currentSeqNr: Long, confirmedSeqNr: Long): Unit = () + override def producerSent(producerId: String, seqNr: Long): Unit = () + override def producerWaitingForRequest(producerId: String, currentSeqNr: Long): Unit = () + override def producerResentUnconfirmed(producerId: String, fromSeqNr: Long, toSeqNr: Long): Unit = () + override def producerResentFirst(producerId: String, firstSeqNr: Long): Unit = () + override def producerResentFirstUnconfirmed(producerId: String, seqNr: Long): Unit = () + override def producerReceived(producerId: String, currentSeqNr: Long): Unit = () + override def producerReceivedRequest(producerId: String, requestedSeqNr: Long): Unit = () + override def producerReceivedResend(producerId: String, fromSeqNr: Long): Unit = () + + override def consumerCreated(path: ActorPath): Unit = () + override def consumerStarted(path: ActorPath): Unit = () + override def consumerReceived(producerId: String, seqNr: Long): Unit = () + override def consumerReceivedPreviousInProgress(producerId: String, seqNr: Long, stashed: Int): Unit = () + override def consumerDuplicate(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = () + override def consumerMissing(pid: String, expectedSeqNr: Long, seqNr: Long): Unit = () + override def consumerReceivedResend(seqNr: Long): Unit = () + override def consumerSentRequest(producerId: String, requestedSeqNr: Long): Unit = () + override def consumerChangedProducer(producerId: String): Unit = () + +} diff --git a/akka-bench-jmh/src/main/resources/logback-test.xml b/akka-bench-jmh/src/main/resources/logback.xml similarity index 76% rename from akka-bench-jmh/src/main/resources/logback-test.xml rename to akka-bench-jmh/src/main/resources/logback.xml index 84ff1fe425..8beec7b3f1 100644 --- a/akka-bench-jmh/src/main/resources/logback-test.xml +++ b/akka-bench-jmh/src/main/resources/logback.xml @@ -4,14 +4,11 @@ - - INFO - %date{ISO8601} %-5level %logger %marker - %msg {%mdc}%n - + diff --git a/akka-bench-jmh/src/main/scala/akka/actor/typed/delivery/ReliableDeliveryBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/typed/delivery/ReliableDeliveryBenchmark.scala new file mode 100644 index 0000000000..575fbd5e4e --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/actor/typed/delivery/ReliableDeliveryBenchmark.scala @@ -0,0 +1,157 @@ +/* + * Copyright (C) 2014-2020 Lightbend Inc. + */ + +package akka.actor.typed.delivery + +import java.util.UUID +import java.util.concurrent.TimeUnit + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.Done +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.scaladsl.Behaviors +import com.typesafe.config.ConfigFactory +import org.openjdk.jmh.annotations._ + +object ReliableDeliveryBenchmark { + + final val messagesPerOperation = 100000 + final val timeout = 30.seconds + + object Producer { + trait Command + + case object Run extends Command + private case class WrappedRequestNext(r: ProducerController.RequestNext[Consumer.Command]) extends Command + + def apply( + numberOfMessages: Int, + producerController: ActorRef[ProducerController.Command[Consumer.Command]]): Behavior[Command] = { + Behaviors.setup { context => + val requestNextAdapter = + context.messageAdapter[ProducerController.RequestNext[Consumer.Command]](WrappedRequestNext(_)) + + Behaviors.receiveMessage { + case WrappedRequestNext(next) => + if (next.confirmedSeqNr >= numberOfMessages) { + context.log.info("Completed {} messages", numberOfMessages) + Behaviors.stopped + } else { + next.sendNextTo ! Consumer.TheMessage + Behaviors.same + } + + case Run => + context.log.info("Starting {} messages", numberOfMessages) + producerController ! ProducerController.Start(requestNextAdapter) + Behaviors.same + } + } + } + } + + object Consumer { + trait Command + + case object TheMessage extends Command + + private case class WrappedDelivery(d: ConsumerController.Delivery[Command]) extends Command + + def apply(consumerController: ActorRef[ConsumerController.Command[Command]]): Behavior[Command] = { + Behaviors.setup { context => + val deliveryAdapter = + context.messageAdapter[ConsumerController.Delivery[Command]](WrappedDelivery(_)) + consumerController ! ConsumerController.Start(deliveryAdapter) + + Behaviors.receiveMessagePartial { + case WrappedDelivery(d @ ConsumerController.Delivery(_, confirmTo)) => + context.log.trace("Processed {}", d.seqNr) + confirmTo ! ConsumerController.Confirmed + Behaviors.same + } + } + } + } + + object Guardian { + + trait Command + final case class Run(id: String, numberOfMessages: Int, replyTo: ActorRef[Done]) extends Command + final case class ProducerTerminated(consumer: ActorRef[Consumer.Command], replyTo: ActorRef[Done]) extends Command + + def apply(): Behavior[Command] = { + Behaviors.setup { context => + Behaviors.receiveMessage { + case Run(id, numberOfMessages, replyTo) => + val consumerController = context.spawn(ConsumerController[Consumer.Command](), s"consumerController-$id") + val consumer = context.spawn(Consumer(consumerController), s"consumer-$id") + + val producerController = context.spawn( + ProducerController[Consumer.Command](id, durableQueueBehavior = None), + s"producerController-$id") + val producer = context.spawn(Producer(numberOfMessages, producerController), s"producer-$id") + context.watchWith(producer, ProducerTerminated(consumer, replyTo)) + + consumerController ! ConsumerController.RegisterToProducerController(producerController) + + producer ! Producer.Run + + Behaviors.same + + case ProducerTerminated(consumer, replyTo) => + context.stop(consumer) + replyTo ! Done + Behaviors.same + } + } + } + } +} +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.Throughput)) +@Fork(1) +@Threads(1) +@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS, batchSize = 1) +@Measurement(iterations = 10, time = 10, timeUnit = TimeUnit.SECONDS, batchSize = 1) +class ReliableDeliveryBenchmark { + import ReliableDeliveryBenchmark._ + + @Param(Array("10", "50")) + var window = 0 + + implicit var system: ActorSystem[Guardian.Command] = _ + + implicit val askTimeout = akka.util.Timeout(timeout) + + @Setup(Level.Trial) + def setup(): Unit = { + system = ActorSystem( + Guardian(), + "ReliableDeliveryBenchmark", + ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.reliable-delivery { + consumer-controller.flow-control-window = $window + } + """)) + } + + @TearDown(Level.Trial) + def shutdown(): Unit = { + system.terminate() + Await.ready(system.whenTerminated, 15.seconds) + } + + @Benchmark + @OperationsPerInvocation(messagesPerOperation) + def echo(): Unit = { + Await.result(system.ask(Guardian.Run(UUID.randomUUID().toString, messagesPerOperation, _)), timeout) + } + +} diff --git a/build.sbt b/build.sbt index 87b96a19a5..dc07bccd96 100644 --- a/build.sbt +++ b/build.sbt @@ -271,7 +271,10 @@ lazy val persistenceTck = akkaModule("akka-persistence-tck") .disablePlugins(MimaPlugin) lazy val persistenceTestkit = akkaModule("akka-persistence-testkit") - .dependsOn(persistenceTyped % "compile->compile;provided->provided;test->test", testkit % "compile->compile;test->test", persistenceTck % "test") + .dependsOn( + persistenceTyped % "compile->compile;provided->provided;test->test", + testkit % "compile->compile;test->test", + persistenceTck % "test") .settings(Dependencies.persistenceTestKit) .settings(AutomaticModuleName.settings("akka.persistence.testkit")) .disablePlugins(MimaPlugin) @@ -325,7 +328,7 @@ lazy val remote = lazy val remoteTests = akkaModule("akka-remote-tests") .dependsOn( actorTests % "test->test", - remote % "test->test", + remote % "compile->CompileJdk9;test->test", streamTestkit % "test", multiNodeTestkit, jackson % "test->test") @@ -397,6 +400,7 @@ lazy val actorTyped = akkaModule("akka-actor-typed") import akka.util.Timeout implicit val timeout = Timeout(5.seconds) """) + .enablePlugins(Jdk9) lazy val persistenceTyped = akkaModule("akka-persistence-typed") .dependsOn( @@ -434,11 +438,13 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed") lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") .dependsOn( + actorTyped % "compile->CompileJdk9", clusterTyped % "compile->compile;test->test;multi-jvm->multi-jvm", clusterSharding, actorTestkitTyped % "test->test", actorTypedTests % "test->test", persistenceTyped % "test->test", + remote % "compile->CompileJdk9;test->test", remoteTests % "test->test", jackson % "test->test") .settings(javacOptions += "-parameters") // for Jackson @@ -465,10 +471,7 @@ lazy val actorTestkitTyped = akkaModule("akka-actor-testkit-typed") .settings(Dependencies.actorTestkitTyped) lazy val actorTypedTests = akkaModule("akka-actor-typed-tests") - .dependsOn( - actorTyped, - actorTestkitTyped % "compile->compile;test->test" - ) + .dependsOn(actorTyped % "compile->CompileJdk9", actorTestkitTyped % "compile->compile;test->test") .settings(AkkaBuild.mayChangeSettings) .disablePlugins(MimaPlugin) .enablePlugins(NoPublish) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 83b76c1d71..f0e99d8740 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -270,7 +270,7 @@ object Dependencies { val docs = l ++= Seq(Test.scalatest, Test.junit, Docs.sprayJson, Docs.gson, Provided.levelDB) - val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Compile.jctools) + val benchJmh = l ++= Seq(logback, Provided.levelDB, Provided.levelDBNative, Compile.jctools) // akka stream