From 3ed4dbc1b26c6bee32b05e61de73e6bec2a7350b Mon Sep 17 00:00:00 2001 From: Jacek Ewertowski Date: Fri, 3 Apr 2020 13:14:11 +0200 Subject: [PATCH] Typed: Configurable resend interval for ProducerController (#28779) --- .../src/main/resources/reference.conf | 4 +++ .../typed/delivery/ProducerController.scala | 27 +++++++++++++++---- .../internal/ProducerControllerImpl.scala | 5 ++-- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/akka-actor-typed/src/main/resources/reference.conf b/akka-actor-typed/src/main/resources/reference.conf index a3476814a3..b3f085dfb5 100644 --- a/akka-actor-typed/src/main/resources/reference.conf +++ b/akka-actor-typed/src/main/resources/reference.conf @@ -76,6 +76,10 @@ akka.reliable-delivery { # The ProducerController retries requests to the durable queue this # number of times before failing. retry-attempts = 10 + + # The ProducerController retries sending the first message with this interval + # until it has been confirmed. + resend-first-interval = 1s } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala index e1562ffabc..80222779b4 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala @@ -151,7 +151,8 @@ object ProducerController { def apply(config: Config): Settings = { new Settings( durableQueueRequestTimeout = config.getDuration("durable-queue.request-timeout").asScala, - durableQueueRetryAttempts = config.getInt("durable-queue.retry-attempts")) + durableQueueRetryAttempts = config.getInt("durable-queue.retry-attempts"), + durableQueueResendFirstInterval = config.getDuration("durable-queue.resend-first-interval").asScala) } /** @@ -169,7 +170,10 @@ object ProducerController { apply(config) } - final class Settings private (val durableQueueRequestTimeout: FiniteDuration, val durableQueueRetryAttempts: Int) { + final class Settings private ( + val durableQueueRequestTimeout: FiniteDuration, + val durableQueueRetryAttempts: Int, + val durableQueueResendFirstInterval: FiniteDuration) { def withDurableQueueRetryAttempts(newDurableQueueRetryAttempts: Int): Settings = copy(durableQueueRetryAttempts = newDurableQueueRetryAttempts) @@ -180,12 +184,24 @@ object ProducerController { def withDurableQueueRequestTimeout(newDurableQueueRequestTimeout: FiniteDuration): Settings = copy(durableQueueRequestTimeout = newDurableQueueRequestTimeout) + /** + * Scala API + */ + def withDurableQueueResendFirstInterval(newDurableQueueResendFirstInterval: FiniteDuration): Settings = + copy(durableQueueResendFirstInterval = newDurableQueueResendFirstInterval) + /** * Java API */ def withDurableQueueRequestTimeout(newDurableQueueRequestTimeout: JavaDuration): Settings = copy(durableQueueRequestTimeout = newDurableQueueRequestTimeout.asScala) + /** + * Java API + */ + def withDurableQueueResendFirstInterval(newDurableQueueResendFirstInterval: JavaDuration): Settings = + copy(durableQueueResendFirstInterval = newDurableQueueResendFirstInterval.asScala) + /** * Java API */ @@ -197,11 +213,12 @@ object ProducerController { */ private def copy( durableQueueRequestTimeout: FiniteDuration = durableQueueRequestTimeout, - durableQueueRetryAttempts: Int = durableQueueRetryAttempts) = - new Settings(durableQueueRequestTimeout, durableQueueRetryAttempts) + durableQueueRetryAttempts: Int = durableQueueRetryAttempts, + durableQueueResendFirstInterval: FiniteDuration = durableQueueResendFirstInterval) = + new Settings(durableQueueRequestTimeout, durableQueueRetryAttempts, durableQueueResendFirstInterval) override def toString: String = - s"Settings($durableQueueRequestTimeout, $durableQueueRetryAttempts)" + s"Settings($durableQueueRequestTimeout, $durableQueueRetryAttempts, $durableQueueResendFirstInterval)" } def apply[A: ClassTag]( diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala index 00e9ff6794..40d26ece1e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala @@ -6,7 +6,6 @@ package akka.actor.typed.delivery.internal import java.util.concurrent.TimeoutException -import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.util.Failure import scala.util.Success @@ -367,7 +366,7 @@ private class ProducerControllerImpl[A: ClassTag]( else Vector.empty // no resending, no need to keep unconfirmed if (s.currentSeqNr == s.firstSeqNr) - timers.startTimerWithFixedDelay(ResendFirst, ResendFirst, 1.second) + timers.startTimerWithFixedDelay(ResendFirst, delay = settings.durableQueueResendFirstInterval) flightRecorder.producerSent(producerId, seqMsg.seqNr) s.send(seqMsg) @@ -559,7 +558,7 @@ private class ProducerControllerImpl[A: ClassTag]( consumerController, newFirstSeqNr) if (s.unconfirmed.nonEmpty) { - timers.startTimerWithFixedDelay(ResendFirst, ResendFirst, 1.second) + timers.startTimerWithFixedDelay(ResendFirst, delay = settings.durableQueueResendFirstInterval) context.self ! ResendFirst } // update the send function