diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala index 3041f68af7..0ae382a660 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala @@ -228,6 +228,8 @@ private class ConsumerControllerImpl[A]( private val flightRecorder = ActorFlightRecorder(context.system).delivery + private val traceEnabled = context.log.isTraceEnabled + startRetryTimer() private def resendLost = !settings.onlyFlowControl @@ -245,7 +247,7 @@ private class ConsumerControllerImpl[A]( flightRecorder.consumerReceived(pid, seqNr) if (s.isProducerChanged(seqMsg)) { - if (seqMsg.first) + if (seqMsg.first && traceEnabled) context.log.trace("Received first SequencedMessage seqNr [{}], delivering to consumer.", seqNr) receiveChangedProducer(s, seqMsg) } else if (s.registering.isDefined) { @@ -254,7 +256,8 @@ private class ConsumerControllerImpl[A]( seqNr) stashBuffer.unstash(Behaviors.same, 1, scalaIdentityFunction) } else if (s.isNextExpected(seqMsg)) { - context.log.trace("Received SequencedMessage seqNr [{}], delivering to consumer.", seqNr) + if (traceEnabled) + context.log.trace("Received SequencedMessage seqNr [{}], delivering to consumer.", seqNr) deliver(s.copy(receivedSeqNr = seqNr), seqMsg) } else if (seqNr > expectedSeqNr) { flightRecorder.consumerMissing(pid, expectedSeqNr, seqNr) @@ -374,7 +377,7 @@ private class ConsumerControllerImpl[A]( val seqNr = seqMsg.seqNr if (s.isProducerChanged(seqMsg)) { - if (seqMsg.first) + if (seqMsg.first && traceEnabled) context.log.trace("Received first SequencedMessage seqNr [{}], delivering to consumer.", seqNr) receiveChangedProducer(s, seqMsg) } else if (s.registering.isDefined) { @@ -441,7 +444,11 @@ private class ConsumerControllerImpl[A]( .receiveMessage[InternalCommand] { case Confirmed => val seqNr = seqMsg.seqNr - context.log.trace("Received Confirmed seqNr [{}] from consumer, stashed size [{}].", seqNr, stashBuffer.size) + if (traceEnabled) + context.log.trace( + "Received Confirmed seqNr [{}] from consumer, stashed size [{}].", + seqNr, + stashBuffer.size) val newRequestedSeqNr = if (seqMsg.first) { @@ -466,7 +473,8 @@ private class ConsumerControllerImpl[A]( newRequestedSeqNr } else { if (seqMsg.ack) { - context.log.trace("Sending Ack seqNr [{}].", seqNr) + if (traceEnabled) + context.log.trace("Sending Ack seqNr [{}].", seqNr) s.producerController ! Ack(confirmedSeqNr = seqNr) } s.requestedSeqNr @@ -499,7 +507,7 @@ private class ConsumerControllerImpl[A]( "Received SequencedMessage seqNr [{}], discarding message because stash is full.", msg.seqNr) } else { - if (context.log.isTraceEnabled()) + if (traceEnabled) context.log.traceN( "Received SequencedMessage seqNr [{}], stashing while waiting for consumer to confirm [{}], stashed size [{}].", msg.seqNr, diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala index 44a7e33f5c..c08aaa3908 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala @@ -352,6 +352,7 @@ private class ProducerControllerImpl[A: ClassTag]( import ProducerControllerImpl._ private val flightRecorder = ActorFlightRecorder(context.system).delivery + private val traceEnabled = context.log.isTraceEnabled // for the durableQueue StoreMessageSent ask private implicit val askTimeout: Timeout = settings.durableQueueRequestTimeout @@ -359,7 +360,7 @@ private class ProducerControllerImpl[A: ClassTag]( def onMsg(m: A, newReplyAfterStore: Map[SeqNr, ActorRef[SeqNr]], ack: Boolean): Behavior[InternalCommand] = { checkOnMsgRequestedState() - if (context.log.isTraceEnabled) + if (traceEnabled) context.log.trace("Sending [{}] with seqNr [{}].", m.getClass.getName, s.currentSeqNr) val seqMsg = SequencedMessage(producerId, s.currentSeqNr, m, s.currentSeqNr == s.firstSeqNr, ack)(context.self) val newUnconfirmed = @@ -449,7 +450,8 @@ private class ProducerControllerImpl[A: ClassTag]( } def receiveAck(newConfirmedSeqNr: SeqNr): Behavior[InternalCommand] = { - context.log.trace2("Received Ack, confirmed [{}], current [{}].", newConfirmedSeqNr, s.currentSeqNr) + if (traceEnabled) + context.log.trace2("Received Ack, confirmed [{}], current [{}].", newConfirmedSeqNr, s.currentSeqNr) val stateAfterAck = onAck(newConfirmedSeqNr) if (newConfirmedSeqNr == s.firstSeqNr && stateAfterAck.unconfirmed.nonEmpty) { resendUnconfirmed(stateAfterAck.unconfirmed) @@ -459,7 +461,7 @@ private class ProducerControllerImpl[A: ClassTag]( def onAck(newConfirmedSeqNr: SeqNr): State[A] = { val (replies, newReplyAfterStore) = s.replyAfterStore.partition { case (seqNr, _) => seqNr <= newConfirmedSeqNr } - if (replies.nonEmpty) + if (replies.nonEmpty && traceEnabled) context.log.trace("Sending confirmation replies from [{}] to [{}].", replies.head._1, replies.last._1) replies.foreach { case (seqNr, replyTo) => replyTo ! seqNr @@ -489,7 +491,8 @@ private class ProducerControllerImpl[A: ClassTag]( throw new IllegalStateException(s"currentSeqNr [${s.currentSeqNr}] not matching stored seqNr [$seqNr]") s.replyAfterStore.get(seqNr).foreach { replyTo => - context.log.trace("Sending confirmation reply to [{}] after storage.", seqNr) + if (traceEnabled) + context.log.trace("Sending confirmation reply to [{}] after storage.", seqNr) replyTo ! seqNr } val newReplyAfterStore = s.replyAfterStore - seqNr diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala index a5290da946..d6c846b83c 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala @@ -278,6 +278,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( import WorkPullingProducerController.WorkerStats import WorkPullingProducerControllerImpl._ + private val traceEnabled = context.log.isTraceEnabled private val durableQueueAskTimeout: Timeout = settings.producerControllerSettings.durableQueueRequestTimeout private val workerAskTimeout: Timeout = settings.internalAskTimeout @@ -288,7 +289,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( def onMessage(msg: A, wasStashed: Boolean, replyTo: Option[ActorRef[Done]], totalSeqNr: TotalSeqNr): State[A] = { val consumersWithDemand = s.out.iterator.filter { case (_, out) => out.askNextTo.isDefined }.toVector - if (context.log.isTraceEnabled) + if (traceEnabled) context.log.traceN( "Received message seqNr [{}], wasStashed [{}], consumersWithDemand [{}], hasRequested [{}].", totalSeqNr, @@ -337,7 +338,8 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( } def tellRequestNext(): Unit = { - context.log.trace("Sending RequestNext to producer, seqNr [{}].", totalSeqNr) + if (traceEnabled) + context.log.trace("Sending RequestNext to producer, seqNr [{}].", totalSeqNr) s.producer ! requestNext } @@ -452,7 +454,8 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( def receiveStoreMessageSentCompleted(seqNr: SeqNr, m: A) = { s.replyAfterStore.get(seqNr).foreach { replyTo => - context.log.trace("Sending reply for seqNr [{}] after storage.", seqNr) + if (traceEnabled) + context.log.trace("Sending reply for seqNr [{}] after storage.", seqNr) replyTo ! Done } @@ -484,7 +487,8 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( } if (confirmed.nonEmpty) { - context.log.trace("Received Ack seqNr [{}] from worker [{}].", confirmedSeqNr, outState.confirmationQualifier) + if (traceEnabled) + context.log.trace("Received Ack seqNr [{}] from worker [{}].", confirmedSeqNr, outState.confirmationQualifier) confirmed.foreach { case Unconfirmed(_, _, _, None) => // no reply case Unconfirmed(_, _, _, Some(replyTo)) => @@ -509,10 +513,11 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( s.out.get(outKey) match { case Some(outState) => val confirmedSeqNr = w.next.confirmedSeqNr - context.log.trace2( - "Received RequestNext from worker [{}], confirmedSeqNr [{}].", - w.next.producerId, - confirmedSeqNr) + if (traceEnabled) + context.log.trace2( + "Received RequestNext from worker [{}], confirmedSeqNr [{}].", + w.next.producerId, + confirmedSeqNr) val newUnconfirmed = onAck(outState, confirmedSeqNr) @@ -528,7 +533,9 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( } else if (s.requested) { active(s.copy(out = newOut)) } else { - context.log.trace("Sending RequestNext to producer after RequestNext from worker [{}].", w.next.producerId) + if (traceEnabled) + context.log + .trace("Sending RequestNext to producer after RequestNext from worker [{}].", w.next.producerId) s.producer ! requestNext active(s.copy(out = newOut, requested = true)) } diff --git a/akka-bench-jmh/src/main/scala/akka/actor/typed/delivery/ReliableDeliveryBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/actor/typed/delivery/ReliableDeliveryBenchmark.scala index ed5ed75a72..7bc51c96c4 100644 --- a/akka-bench-jmh/src/main/scala/akka/actor/typed/delivery/ReliableDeliveryBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/actor/typed/delivery/ReliableDeliveryBenchmark.scala @@ -82,13 +82,15 @@ object Consumer { def apply(consumerController: ActorRef[ConsumerController.Command[Command]]): Behavior[Command] = { Behaviors.setup { context => + val traceEnabled = context.log.isTraceEnabled val deliveryAdapter = context.messageAdapter[ConsumerController.Delivery[Command]](WrappedDelivery(_)) consumerController ! ConsumerController.Start(deliveryAdapter) Behaviors.receiveMessagePartial { case WrappedDelivery(d @ ConsumerController.Delivery(_, confirmTo)) => - context.log.trace("Processed {}", d.seqNr) + if (traceEnabled) + context.log.trace("Processed {}", d.seqNr) confirmTo ! ConsumerController.Confirmed Behaviors.same } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala index 3b1665fba0..395a5c47ed 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala @@ -281,6 +281,7 @@ private class ShardingProducerControllerImpl[A: ClassTag]( private val durableQueueAskTimeout: Timeout = settings.producerControllerSettings.durableQueueRequestTimeout private val entityAskTimeout: Timeout = settings.internalAskTimeout + private val traceEnabled = context.log.isTraceEnabled private val requestNextAdapter: ActorRef[ProducerController.RequestNext[A]] = context.messageAdapter(WrappedRequestNext.apply) @@ -404,7 +405,8 @@ private class ShardingProducerControllerImpl[A: ClassTag]( def receiveAck(ack: Ack): Behavior[InternalCommand] = { s.out.get(ack.outKey) match { case Some(outState) => - context.log.trace2("Received Ack, confirmed [{}], current [{}].", ack.confirmedSeqNr, s.currentSeqNr) + if (traceEnabled) + context.log.trace2("Received Ack, confirmed [{}], current [{}].", ack.confirmedSeqNr, s.currentSeqNr) val newUnconfirmed = onAck(outState, ack.confirmedSeqNr) val newUsedNanoTime = if (newUnconfirmed.size != outState.unconfirmed.size) System.nanoTime() else outState.usedNanoTime @@ -426,7 +428,8 @@ private class ShardingProducerControllerImpl[A: ClassTag]( throw new IllegalStateException(s"Received RequestNext but already has demand for [$outKey]") val confirmedSeqNr = w.next.confirmedSeqNr - context.log.trace("Received RequestNext from [{}], confirmed seqNr [{}]", out.entityId, confirmedSeqNr) + if (traceEnabled) + context.log.trace("Received RequestNext from [{}], confirmed seqNr [{}]", out.entityId, confirmedSeqNr) val newUnconfirmed = onAck(out, confirmedSeqNr) if (out.buffered.nonEmpty) { @@ -573,7 +576,7 @@ private class ShardingProducerControllerImpl[A: ClassTag]( } private def send(msg: A, outKey: OutKey, outSeqNr: OutSeqNr, nextTo: ProducerController.RequestNext[A]): Unit = { - if (context.log.isTraceEnabled) + if (traceEnabled) context.log.traceN("Sending [{}] to [{}] with outSeqNr [{}].", msg.getClass.getName, outKey, outSeqNr) implicit val askTimeout: Timeout = entityAskTimeout context.ask[ProducerController.MessageWithConfirmation[A], OutSeqNr]( diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/delivery/EventSourcedProducerQueue.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/delivery/EventSourcedProducerQueue.scala index 2a091c5c49..b40fdcace4 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/delivery/EventSourcedProducerQueue.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/delivery/EventSourcedProducerQueue.scala @@ -219,14 +219,17 @@ private class EventSourcedProducerQueue[A]( cleanupUnusedAfter: FiniteDuration) { import DurableProducerQueue._ + private val traceEnabled = context.log.isTraceEnabled + def onCommand(state: State[A], command: Command[A]): Effect[Event, State[A]] = { command match { case StoreMessageSent(sent, replyTo) => if (sent.seqNr == state.currentSeqNr) { - context.log.trace( - "StoreMessageSent seqNr [{}], confirmationQualifier [{}]", - sent.seqNr, - sent.confirmationQualifier) + if (traceEnabled) + context.log.trace( + "StoreMessageSent seqNr [{}], confirmationQualifier [{}]", + sent.seqNr, + sent.confirmationQualifier) Effect.persist(sent).thenReply(replyTo)(_ => StoreMessageSentAck(sent.seqNr)) } else if (sent.seqNr == state.currentSeqNr - 1) { // already stored, could be a retry after timout @@ -239,7 +242,11 @@ private class EventSourcedProducerQueue[A]( } case StoreMessageConfirmed(seqNr, confirmationQualifier, timestampMillis) => - context.log.trace("StoreMessageConfirmed seqNr [{}], confirmationQualifier [{}]", seqNr, confirmationQualifier) + if (traceEnabled) + context.log.trace( + "StoreMessageConfirmed seqNr [{}], confirmationQualifier [{}]", + seqNr, + confirmationQualifier) val previousConfirmedSeqNr = state.confirmedSeqNr.get(confirmationQualifier) match { case Some((nr, _)) => nr case None => 0L