From b06b9e74428183bc80b94839207798ff17c16aa8 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 15 Jan 2019 13:38:50 +0100 Subject: [PATCH] optimize AtLeastOnceDelivery by not scheduling ticks when not needed, #26216 * when there are no pending unconfirmed messages the redelivery tick is not needed * before the change 100k idle actors used around 25% CPU on my machine * dropped to almost 0% after the change (ofc) * not using Timers API because that would not be binary compatible --- .../persistence/AtLeastOnceDelivery.scala | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala index 511276c732..c2b68b4979 100644 --- a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala @@ -6,9 +6,11 @@ package akka.persistence import scala.collection.immutable import scala.concurrent.duration.FiniteDuration + import akka.actor.{ ActorPath, ActorSelection, NotInfluenceReceiveTimeout } import akka.persistence.serialization.Message import akka.actor.Cancellable +import akka.actor.DeadLetterSuppression import akka.annotation.InternalApi import akka.persistence.AtLeastOnceDelivery.Internal.Delivery import akka.util.ccompat._ @@ -70,7 +72,7 @@ object AtLeastOnceDelivery { */ private[akka] object Internal { case class Delivery(destination: ActorPath, message: Any, timestamp: Long, attempt: Int) - case object RedeliveryTick extends NotInfluenceReceiveTimeout + case object RedeliveryTick extends NotInfluenceReceiveTimeout with DeadLetterSuppression } } @@ -233,9 +235,16 @@ trait AtLeastOnceDeliveryLike extends Eventsourced { private var unconfirmed = immutable.SortedMap.empty[Long, Delivery] private def startRedeliverTask(): Unit = { - val interval = redeliverInterval / 2 - redeliverTask = Some( - context.system.scheduler.schedule(interval, interval, self, RedeliveryTick)(context.dispatcher)) + if (redeliverTask.isEmpty) { + val interval = redeliverInterval / 2 + redeliverTask = Some( + context.system.scheduler.schedule(interval, interval, self, RedeliveryTick)(context.dispatcher)) + } + } + + private def cancelRedeliveryTask(): Unit = { + redeliverTask.foreach(_.cancel()) + redeliverTask = None } private def nextDeliverySequenceNr(): Long = { @@ -283,6 +292,8 @@ trait AtLeastOnceDeliveryLike extends Eventsourced { def confirmDelivery(deliveryId: Long): Boolean = { if (unconfirmed.contains(deliveryId)) { unconfirmed -= deliveryId + if (unconfirmed.isEmpty) + cancelRedeliveryTask() true } else false } @@ -316,6 +327,7 @@ trait AtLeastOnceDeliveryLike extends Eventsourced { private def send(deliveryId: Long, d: Delivery, timestamp: Long): Unit = { context.actorSelection(d.destination) ! d.message unconfirmed = unconfirmed.updated(deliveryId, d.copy(timestamp = timestamp, attempt = d.attempt + 1)) + startRedeliverTask() } /** @@ -349,7 +361,7 @@ trait AtLeastOnceDeliveryLike extends Eventsourced { * INTERNAL API */ override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { - redeliverTask.foreach(_.cancel()) + cancelRedeliveryTask() super.aroundPreRestart(reason, message) } @@ -357,13 +369,15 @@ trait AtLeastOnceDeliveryLike extends Eventsourced { * INTERNAL API */ override protected[akka] def aroundPostStop(): Unit = { - redeliverTask.foreach(_.cancel()) + cancelRedeliveryTask() super.aroundPostStop() } override private[akka] def onReplaySuccess(): Unit = { - redeliverOverdue() - startRedeliverTask() + if (unconfirmed.nonEmpty) { + redeliverOverdue() + startRedeliverTask() + } super.onReplaySuccess() } @@ -375,7 +389,7 @@ trait AtLeastOnceDeliveryLike extends Eventsourced { case RedeliveryTick ⇒ redeliverOverdue() - case x ⇒ + case _ ⇒ super.aroundReceive(receive, message) } }