Typed: Configurable resend interval for ProducerController (#28779)

This commit is contained in:
Jacek Ewertowski 2020-04-03 13:14:11 +02:00 committed by GitHub
parent 6e17f3e504
commit 3ed4dbc1b2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 8 deletions

View file

@ -76,6 +76,10 @@ akka.reliable-delivery {
# The ProducerController retries requests to the durable queue this # The ProducerController retries requests to the durable queue this
# number of times before failing. # number of times before failing.
retry-attempts = 10 retry-attempts = 10
# The ProducerController retries sending the first message with this interval
# until it has been confirmed.
resend-first-interval = 1s
} }
} }

View file

@ -151,7 +151,8 @@ object ProducerController {
def apply(config: Config): Settings = { def apply(config: Config): Settings = {
new Settings( new Settings(
durableQueueRequestTimeout = config.getDuration("durable-queue.request-timeout").asScala, durableQueueRequestTimeout = config.getDuration("durable-queue.request-timeout").asScala,
durableQueueRetryAttempts = config.getInt("durable-queue.retry-attempts")) durableQueueRetryAttempts = config.getInt("durable-queue.retry-attempts"),
durableQueueResendFirstInterval = config.getDuration("durable-queue.resend-first-interval").asScala)
} }
/** /**
@ -169,7 +170,10 @@ object ProducerController {
apply(config) apply(config)
} }
final class Settings private (val durableQueueRequestTimeout: FiniteDuration, val durableQueueRetryAttempts: Int) { final class Settings private (
val durableQueueRequestTimeout: FiniteDuration,
val durableQueueRetryAttempts: Int,
val durableQueueResendFirstInterval: FiniteDuration) {
def withDurableQueueRetryAttempts(newDurableQueueRetryAttempts: Int): Settings = def withDurableQueueRetryAttempts(newDurableQueueRetryAttempts: Int): Settings =
copy(durableQueueRetryAttempts = newDurableQueueRetryAttempts) copy(durableQueueRetryAttempts = newDurableQueueRetryAttempts)
@ -180,12 +184,24 @@ object ProducerController {
def withDurableQueueRequestTimeout(newDurableQueueRequestTimeout: FiniteDuration): Settings = def withDurableQueueRequestTimeout(newDurableQueueRequestTimeout: FiniteDuration): Settings =
copy(durableQueueRequestTimeout = newDurableQueueRequestTimeout) copy(durableQueueRequestTimeout = newDurableQueueRequestTimeout)
/**
* Scala API
*/
def withDurableQueueResendFirstInterval(newDurableQueueResendFirstInterval: FiniteDuration): Settings =
copy(durableQueueResendFirstInterval = newDurableQueueResendFirstInterval)
/** /**
* Java API * Java API
*/ */
def withDurableQueueRequestTimeout(newDurableQueueRequestTimeout: JavaDuration): Settings = def withDurableQueueRequestTimeout(newDurableQueueRequestTimeout: JavaDuration): Settings =
copy(durableQueueRequestTimeout = newDurableQueueRequestTimeout.asScala) copy(durableQueueRequestTimeout = newDurableQueueRequestTimeout.asScala)
/**
* Java API
*/
def withDurableQueueResendFirstInterval(newDurableQueueResendFirstInterval: JavaDuration): Settings =
copy(durableQueueResendFirstInterval = newDurableQueueResendFirstInterval.asScala)
/** /**
* Java API * Java API
*/ */
@ -197,11 +213,12 @@ object ProducerController {
*/ */
private def copy( private def copy(
durableQueueRequestTimeout: FiniteDuration = durableQueueRequestTimeout, durableQueueRequestTimeout: FiniteDuration = durableQueueRequestTimeout,
durableQueueRetryAttempts: Int = durableQueueRetryAttempts) = durableQueueRetryAttempts: Int = durableQueueRetryAttempts,
new Settings(durableQueueRequestTimeout, durableQueueRetryAttempts) durableQueueResendFirstInterval: FiniteDuration = durableQueueResendFirstInterval) =
new Settings(durableQueueRequestTimeout, durableQueueRetryAttempts, durableQueueResendFirstInterval)
override def toString: String = override def toString: String =
s"Settings($durableQueueRequestTimeout, $durableQueueRetryAttempts)" s"Settings($durableQueueRequestTimeout, $durableQueueRetryAttempts, $durableQueueResendFirstInterval)"
} }
def apply[A: ClassTag]( def apply[A: ClassTag](

View file

@ -6,7 +6,6 @@ package akka.actor.typed.delivery.internal
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.util.Failure import scala.util.Failure
import scala.util.Success import scala.util.Success
@ -367,7 +366,7 @@ private class ProducerControllerImpl[A: ClassTag](
else Vector.empty // no resending, no need to keep unconfirmed else Vector.empty // no resending, no need to keep unconfirmed
if (s.currentSeqNr == s.firstSeqNr) if (s.currentSeqNr == s.firstSeqNr)
timers.startTimerWithFixedDelay(ResendFirst, ResendFirst, 1.second) timers.startTimerWithFixedDelay(ResendFirst, delay = settings.durableQueueResendFirstInterval)
flightRecorder.producerSent(producerId, seqMsg.seqNr) flightRecorder.producerSent(producerId, seqMsg.seqNr)
s.send(seqMsg) s.send(seqMsg)
@ -559,7 +558,7 @@ private class ProducerControllerImpl[A: ClassTag](
consumerController, consumerController,
newFirstSeqNr) newFirstSeqNr)
if (s.unconfirmed.nonEmpty) { if (s.unconfirmed.nonEmpty) {
timers.startTimerWithFixedDelay(ResendFirst, ResendFirst, 1.second) timers.startTimerWithFixedDelay(ResendFirst, delay = settings.durableQueueResendFirstInterval)
context.self ! ResendFirst context.self ! ResendFirst
} }
// update the send function // update the send function