optimize AtLeastOnceDelivery by not scheduling ticks when not needed, #26216

* when there are no pending unconfirmed messages the redelivery tick
  is not needed
* before the change 100k idle actors used around 25% CPU on my machine
* dropped to almost 0% after the change (ofc)
* not using Timers API because that would not be binary compatible
This commit is contained in:
Patrik Nordwall 2019-01-15 13:38:50 +01:00
parent e6c81d317c
commit b06b9e7442

View file

@ -6,9 +6,11 @@ package akka.persistence
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.actor.{ ActorPath, ActorSelection, NotInfluenceReceiveTimeout } import akka.actor.{ ActorPath, ActorSelection, NotInfluenceReceiveTimeout }
import akka.persistence.serialization.Message import akka.persistence.serialization.Message
import akka.actor.Cancellable import akka.actor.Cancellable
import akka.actor.DeadLetterSuppression
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence.AtLeastOnceDelivery.Internal.Delivery import akka.persistence.AtLeastOnceDelivery.Internal.Delivery
import akka.util.ccompat._ import akka.util.ccompat._
@ -70,7 +72,7 @@ object AtLeastOnceDelivery {
*/ */
private[akka] object Internal { private[akka] object Internal {
case class Delivery(destination: ActorPath, message: Any, timestamp: Long, attempt: Int) case class Delivery(destination: ActorPath, message: Any, timestamp: Long, attempt: Int)
case object RedeliveryTick extends NotInfluenceReceiveTimeout case object RedeliveryTick extends NotInfluenceReceiveTimeout with DeadLetterSuppression
} }
} }
@ -233,9 +235,16 @@ trait AtLeastOnceDeliveryLike extends Eventsourced {
private var unconfirmed = immutable.SortedMap.empty[Long, Delivery] private var unconfirmed = immutable.SortedMap.empty[Long, Delivery]
private def startRedeliverTask(): Unit = { private def startRedeliverTask(): Unit = {
val interval = redeliverInterval / 2 if (redeliverTask.isEmpty) {
redeliverTask = Some( val interval = redeliverInterval / 2
context.system.scheduler.schedule(interval, interval, self, RedeliveryTick)(context.dispatcher)) redeliverTask = Some(
context.system.scheduler.schedule(interval, interval, self, RedeliveryTick)(context.dispatcher))
}
}
private def cancelRedeliveryTask(): Unit = {
redeliverTask.foreach(_.cancel())
redeliverTask = None
} }
private def nextDeliverySequenceNr(): Long = { private def nextDeliverySequenceNr(): Long = {
@ -283,6 +292,8 @@ trait AtLeastOnceDeliveryLike extends Eventsourced {
def confirmDelivery(deliveryId: Long): Boolean = { def confirmDelivery(deliveryId: Long): Boolean = {
if (unconfirmed.contains(deliveryId)) { if (unconfirmed.contains(deliveryId)) {
unconfirmed -= deliveryId unconfirmed -= deliveryId
if (unconfirmed.isEmpty)
cancelRedeliveryTask()
true true
} else false } else false
} }
@ -316,6 +327,7 @@ trait AtLeastOnceDeliveryLike extends Eventsourced {
private def send(deliveryId: Long, d: Delivery, timestamp: Long): Unit = { private def send(deliveryId: Long, d: Delivery, timestamp: Long): Unit = {
context.actorSelection(d.destination) ! d.message context.actorSelection(d.destination) ! d.message
unconfirmed = unconfirmed.updated(deliveryId, d.copy(timestamp = timestamp, attempt = d.attempt + 1)) unconfirmed = unconfirmed.updated(deliveryId, d.copy(timestamp = timestamp, attempt = d.attempt + 1))
startRedeliverTask()
} }
/** /**
@ -349,7 +361,7 @@ trait AtLeastOnceDeliveryLike extends Eventsourced {
* INTERNAL API * INTERNAL API
*/ */
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
redeliverTask.foreach(_.cancel()) cancelRedeliveryTask()
super.aroundPreRestart(reason, message) super.aroundPreRestart(reason, message)
} }
@ -357,13 +369,15 @@ trait AtLeastOnceDeliveryLike extends Eventsourced {
* INTERNAL API * INTERNAL API
*/ */
override protected[akka] def aroundPostStop(): Unit = { override protected[akka] def aroundPostStop(): Unit = {
redeliverTask.foreach(_.cancel()) cancelRedeliveryTask()
super.aroundPostStop() super.aroundPostStop()
} }
override private[akka] def onReplaySuccess(): Unit = { override private[akka] def onReplaySuccess(): Unit = {
redeliverOverdue() if (unconfirmed.nonEmpty) {
startRedeliverTask() redeliverOverdue()
startRedeliverTask()
}
super.onReplaySuccess() super.onReplaySuccess()
} }
@ -375,7 +389,7 @@ trait AtLeastOnceDeliveryLike extends Eventsourced {
case RedeliveryTick case RedeliveryTick
redeliverOverdue() redeliverOverdue()
case x case _
super.aroundReceive(receive, message) super.aroundReceive(receive, message)
} }
} }