Dynamically adjust the ConsumerController's resend interval for Request, #28720
* increase the interval when idle in active Behavior, no incoming messages * still avoiding rescheduling the timer for each message * when there is a steady flow of incoming messages it will only send the Request messages after half flow-control-window * changed config property resend-interval to resend-interval-min and resend-interval-max * the change of this setting is not backwards compatible but ApiMayChange and probably nobody using that setting yet * * fix ReliableDeliveryRandomSpec * mima filter
This commit is contained in:
parent
bb4940b027
commit
8865ca44d6
7 changed files with 114 additions and 46 deletions
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
package akka.actor.typed.delivery
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
import akka.actor.testkit.typed.scaladsl.LogCapturing
|
||||
|
|
@ -12,12 +14,12 @@ import akka.actor.typed.delivery.ConsumerController.DeliverThenStop
|
|||
import akka.actor.typed.delivery.internal.ConsumerControllerImpl
|
||||
import akka.actor.typed.delivery.internal.ProducerControllerImpl
|
||||
|
||||
class ConsumerControllerSpec
|
||||
extends ScalaTestWithActorTestKit("""
|
||||
akka.reliable-delivery.consumer-controller.flow-control-window = 20
|
||||
""")
|
||||
with AnyWordSpecLike
|
||||
with LogCapturing {
|
||||
class ConsumerControllerSpec extends ScalaTestWithActorTestKit("""
|
||||
akka.reliable-delivery.consumer-controller {
|
||||
flow-control-window = 20
|
||||
resend-interval-min = 1s
|
||||
}
|
||||
""") with AnyWordSpecLike with LogCapturing {
|
||||
import TestConsumer.sequencedMessage
|
||||
|
||||
private var idCount = 0
|
||||
|
|
@ -62,6 +64,7 @@ class ConsumerControllerSpec
|
|||
val producerControllerProbe2 = createTestProbe[ProducerControllerImpl.InternalCommand]()
|
||||
consumerController ! ConsumerController.RegisterToProducerController(producerControllerProbe2.ref)
|
||||
producerControllerProbe2.expectMessage(ProducerController.RegisterConsumer(consumerController))
|
||||
consumerProbe.receiveMessage().confirmTo ! ConsumerController.Confirmed
|
||||
// expected resend
|
||||
producerControllerProbe2.expectMessage(ProducerController.RegisterConsumer(consumerController))
|
||||
|
||||
|
|
@ -80,14 +83,12 @@ class ConsumerControllerSpec
|
|||
val consumerProbe = createTestProbe[ConsumerController.Delivery[TestConsumer.Job]]()
|
||||
consumerController ! ConsumerController.Start(consumerProbe.ref)
|
||||
|
||||
consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]]
|
||||
|
||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, false))
|
||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, true))
|
||||
|
||||
consumerController ! ConsumerController.Confirmed
|
||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(1, 20, true, false))
|
||||
|
||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(1, 20, true, true))
|
||||
|
||||
testKit.stop(consumerController)
|
||||
}
|
||||
|
||||
|
|
@ -185,15 +186,17 @@ class ConsumerControllerSpec
|
|||
consumerController ! sequencedMessage(producerId, 2, producerControllerProbe.ref)
|
||||
consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]]
|
||||
consumerController ! ConsumerController.Confirmed
|
||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(2, 20, true, true))
|
||||
|
||||
consumerController ! sequencedMessage(producerId, 3, producerControllerProbe.ref)
|
||||
consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]].seqNr should ===(3)
|
||||
|
||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(2, 20, true, true))
|
||||
|
||||
consumerController ! ConsumerController.Confirmed
|
||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(3, 20, true, true))
|
||||
|
||||
// exponential back, so now it should be more than 1 sec
|
||||
producerControllerProbe.expectNoMessage(1.1.second)
|
||||
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(3, 20, true, true))
|
||||
|
||||
testKit.stop(consumerController)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import akka.actor.typed.delivery.internal.ProducerControllerImpl
|
|||
class DurableProducerControllerSpec
|
||||
extends ScalaTestWithActorTestKit("""
|
||||
akka.reliable-delivery.consumer-controller.flow-control-window = 20
|
||||
akka.reliable-delivery.consumer-controller.resend-interval-min = 1s
|
||||
""")
|
||||
with AnyWordSpecLike
|
||||
with LogCapturing {
|
||||
|
|
|
|||
|
|
@ -42,12 +42,13 @@ object ReliableDeliveryRandomSpec {
|
|||
}
|
||||
}
|
||||
|
||||
class ReliableDeliveryRandomSpec
|
||||
extends ScalaTestWithActorTestKit("""
|
||||
akka.reliable-delivery.consumer-controller.flow-control-window = 20
|
||||
""")
|
||||
with AnyWordSpecLike
|
||||
with LogCapturing {
|
||||
class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit("""
|
||||
akka.reliable-delivery.consumer-controller {
|
||||
flow-control-window = 20
|
||||
resend-interval-min = 500 ms
|
||||
resend-interval-max = 2 s
|
||||
}
|
||||
""") with AnyWordSpecLike with LogCapturing {
|
||||
import ReliableDeliveryRandomSpec._
|
||||
|
||||
private var idCount = 0
|
||||
|
|
|
|||
|
|
@ -1,2 +1,9 @@
|
|||
# Changes to internals of reliable delivery
|
||||
ProblemFilters.exclude[Problem]("akka.actor.typed.delivery.internal.*")
|
||||
|
||||
# #28720 Dynamically adjust the ConsumerController's resend interval
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.resendInterval")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.withResendInterval")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.withResendInterval")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.getResendInterval")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.this")
|
||||
|
|
|
|||
|
|
@ -90,8 +90,10 @@ akka.reliable-delivery {
|
|||
flow-control-window = 50
|
||||
|
||||
# The ConsumerController resends flow control messages to the
|
||||
# ProducerController with this interval.
|
||||
resend-interval = 1s
|
||||
# ProducerController with the resend-interval-min, and increasing
|
||||
# it gradually to resend-interval-max when idle.
|
||||
resend-interval-min = 2s
|
||||
resend-interval-max = 30s
|
||||
|
||||
# If this is enabled lost messages will not be resent, but flow control is used.
|
||||
# This can be more efficient since messages don't have to be
|
||||
|
|
|
|||
|
|
@ -162,7 +162,8 @@ object ConsumerController {
|
|||
def apply(config: Config): Settings = {
|
||||
new Settings(
|
||||
flowControlWindow = config.getInt("flow-control-window"),
|
||||
resendInterval = config.getDuration("resend-interval").asScala,
|
||||
resendIntervalMin = config.getDuration("resend-interval-min").asScala,
|
||||
resendIntervalMax = config.getDuration("resend-interval-max").asScala,
|
||||
onlyFlowControl = config.getBoolean("only-flow-control"))
|
||||
}
|
||||
|
||||
|
|
@ -183,7 +184,8 @@ object ConsumerController {
|
|||
|
||||
final class Settings private (
|
||||
val flowControlWindow: Int,
|
||||
val resendInterval: FiniteDuration,
|
||||
val resendIntervalMin: FiniteDuration,
|
||||
val resendIntervalMax: FiniteDuration,
|
||||
val onlyFlowControl: Boolean) {
|
||||
|
||||
def withFlowControlWindow(newFlowControlWindow: Int): Settings =
|
||||
|
|
@ -192,20 +194,32 @@ object ConsumerController {
|
|||
/**
|
||||
* Scala API
|
||||
*/
|
||||
def withResendInterval(newResendInterval: FiniteDuration): Settings =
|
||||
copy(resendInterval = newResendInterval)
|
||||
def withResendIntervalMin(newResendIntervalMin: FiniteDuration): Settings =
|
||||
copy(resendIntervalMin = newResendIntervalMin)
|
||||
|
||||
/**
|
||||
* Scala API
|
||||
*/
|
||||
def withResendIntervalMax(newResendIntervalMax: FiniteDuration): Settings =
|
||||
copy(resendIntervalMax = newResendIntervalMax)
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def withResendInterval(newResendInterval: JavaDuration): Settings =
|
||||
copy(resendInterval = newResendInterval.asScala)
|
||||
def withResendIntervalMin(newResendIntervalMin: JavaDuration): Settings =
|
||||
copy(resendIntervalMin = newResendIntervalMin.asScala)
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getResendInterval(): JavaDuration =
|
||||
resendInterval.asJava
|
||||
def withResendIntervalMax(newResendIntervalMax: JavaDuration): Settings =
|
||||
copy(resendIntervalMax = newResendIntervalMax.asScala)
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def getResendIntervalMax(): JavaDuration =
|
||||
resendIntervalMax.asJava
|
||||
|
||||
def withOnlyFlowControl(newOnlyFlowControl: Boolean): Settings =
|
||||
copy(onlyFlowControl = newOnlyFlowControl)
|
||||
|
|
@ -215,12 +229,13 @@ object ConsumerController {
|
|||
*/
|
||||
private def copy(
|
||||
flowControlWindow: Int = flowControlWindow,
|
||||
resendInterval: FiniteDuration = resendInterval,
|
||||
resendIntervalMin: FiniteDuration = resendIntervalMin,
|
||||
resendIntervalMax: FiniteDuration = resendIntervalMax,
|
||||
onlyFlowControl: Boolean = onlyFlowControl) =
|
||||
new Settings(flowControlWindow, resendInterval, onlyFlowControl)
|
||||
new Settings(flowControlWindow, resendIntervalMin, resendIntervalMax, onlyFlowControl)
|
||||
|
||||
override def toString: String =
|
||||
s"Settings($flowControlWindow, $resendInterval, $onlyFlowControl)"
|
||||
s"Settings($flowControlWindow, $resendIntervalMin, $onlyFlowControl)"
|
||||
}
|
||||
|
||||
def apply[A](): Behavior[Command[A]] =
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@
|
|||
|
||||
package akka.actor.typed.delivery.internal
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.PostStop
|
||||
|
|
@ -128,8 +130,9 @@ import akka.util.ConstantFun.scalaIdentityFunction
|
|||
context.watchWith(s.deliverTo, ConsumerTerminated(s.deliverTo))
|
||||
|
||||
flightRecorder.consumerStarted(context.self.path)
|
||||
val retryTimer = new RetryTimer(timers, settings.resendIntervalMin, settings.resendIntervalMax)
|
||||
val activeBehavior =
|
||||
new ConsumerControllerImpl[A](context, timers, stashBuffer, settings)
|
||||
new ConsumerControllerImpl[A](context, retryTimer, stashBuffer, settings)
|
||||
.active(initialState(context, s, registering, stopping))
|
||||
context.log.debug("Received Start, unstash [{}] messages.", stashBuffer.size)
|
||||
stashBuffer.unstash(activeBehavior, 1, scalaIdentityFunction)
|
||||
|
|
@ -171,7 +174,7 @@ import akka.util.ConstantFun.scalaIdentityFunction
|
|||
|
||||
}
|
||||
|
||||
timers.startTimerWithFixedDelay(Retry, Retry, settings.resendInterval)
|
||||
timers.startTimerWithFixedDelay(Retry, Retry, settings.resendIntervalMin)
|
||||
waitForStart(None, stopping = false)
|
||||
}
|
||||
}
|
||||
|
|
@ -207,11 +210,47 @@ import akka.util.ConstantFun.scalaIdentityFunction
|
|||
if (ref.path.address.hasGlobalScope)
|
||||
throw new IllegalArgumentException(s"Consumer [$ref] should be local.")
|
||||
}
|
||||
|
||||
private class RetryTimer(
|
||||
timers: TimerScheduler[ConsumerControllerImpl.InternalCommand],
|
||||
val minBackoff: FiniteDuration,
|
||||
maxBackoff: FiniteDuration) {
|
||||
private var _interval = minBackoff
|
||||
|
||||
def interval(): FiniteDuration =
|
||||
_interval
|
||||
|
||||
def start(): Unit = {
|
||||
_interval = minBackoff
|
||||
timers.startTimerWithFixedDelay(Retry, _interval)
|
||||
}
|
||||
|
||||
def scheduleNext(): Unit = {
|
||||
val newInterval =
|
||||
if (_interval eq maxBackoff)
|
||||
maxBackoff
|
||||
else
|
||||
maxBackoff.min(_interval * 1.5) match {
|
||||
case f: FiniteDuration => f
|
||||
case _ => maxBackoff
|
||||
}
|
||||
if (newInterval != _interval) {
|
||||
_interval = newInterval
|
||||
timers.startTimerWithFixedDelay(Retry, _interval)
|
||||
}
|
||||
}
|
||||
|
||||
def reset(): Unit = {
|
||||
if (_interval ne minBackoff)
|
||||
start()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private class ConsumerControllerImpl[A](
|
||||
context: ActorContext[ConsumerControllerImpl.InternalCommand],
|
||||
timers: TimerScheduler[ConsumerControllerImpl.InternalCommand],
|
||||
retryTimer: ConsumerControllerImpl.RetryTimer,
|
||||
stashBuffer: StashBuffer[ConsumerControllerImpl.InternalCommand],
|
||||
settings: ConsumerController.Settings) {
|
||||
|
||||
|
|
@ -230,7 +269,7 @@ private class ConsumerControllerImpl[A](
|
|||
|
||||
private val traceEnabled = context.log.isTraceEnabled
|
||||
|
||||
startRetryTimer()
|
||||
retryTimer.start()
|
||||
|
||||
private def resendLost = !settings.onlyFlowControl
|
||||
|
||||
|
|
@ -245,6 +284,7 @@ private class ConsumerControllerImpl[A](
|
|||
val expectedSeqNr = s.receivedSeqNr + 1
|
||||
|
||||
flightRecorder.consumerReceived(pid, seqNr)
|
||||
retryTimer.reset()
|
||||
|
||||
if (s.isProducerChanged(seqMsg)) {
|
||||
if (seqMsg.first && traceEnabled)
|
||||
|
|
@ -269,6 +309,7 @@ private class ConsumerControllerImpl[A](
|
|||
if (resendLost) {
|
||||
seqMsg.producerController ! Resend(fromSeqNr = expectedSeqNr)
|
||||
stashBuffer.clear()
|
||||
retryTimer.start()
|
||||
resending(s)
|
||||
} else {
|
||||
deliver(s.copy(receivedSeqNr = seqNr), seqMsg)
|
||||
|
|
@ -336,6 +377,7 @@ private class ConsumerControllerImpl[A](
|
|||
// request resend of all unconfirmed, and mark first
|
||||
seqMsg.producerController ! Resend(0)
|
||||
stashBuffer.clear()
|
||||
retryTimer.start()
|
||||
resending(s)
|
||||
} else {
|
||||
context.log.warnN(
|
||||
|
|
@ -469,7 +511,7 @@ private class ConsumerControllerImpl[A](
|
|||
seqNr,
|
||||
newRequestedSeqNr)
|
||||
s.producerController ! Request(confirmedSeqNr = seqNr, newRequestedSeqNr, resendLost, viaTimeout = false)
|
||||
startRetryTimer() // reset interval since Request was just sent
|
||||
retryTimer.start() // reset interval since Request was just sent
|
||||
newRequestedSeqNr
|
||||
} else {
|
||||
if (seqMsg.ack) {
|
||||
|
|
@ -518,7 +560,8 @@ private class ConsumerControllerImpl[A](
|
|||
Behaviors.same
|
||||
|
||||
case Retry =>
|
||||
receiveRetry(s, () => waitingForConfirmation(retryRequest(s), seqMsg))
|
||||
// no retries when waitingForConfirmation, will be performed from (idle) active
|
||||
Behaviors.same
|
||||
|
||||
case start: Start[A] =>
|
||||
start.deliverTo ! Delivery(seqMsg.message, context.self, seqMsg.producerId, seqMsg.seqNr)
|
||||
|
|
@ -542,6 +585,9 @@ private class ConsumerControllerImpl[A](
|
|||
}
|
||||
|
||||
private def receiveRetry(s: State[A], nextBehavior: () => Behavior[InternalCommand]): Behavior[InternalCommand] = {
|
||||
retryTimer.scheduleNext()
|
||||
if (retryTimer.interval() != retryTimer.minBackoff)
|
||||
context.log.debug("Schedule next retry in [{} ms]", retryTimer.interval().toMillis)
|
||||
s.registering match {
|
||||
case None => nextBehavior()
|
||||
case Some(reg) =>
|
||||
|
|
@ -574,6 +620,7 @@ private class ConsumerControllerImpl[A](
|
|||
"Register to new ProducerController [{}], previous was [{}].",
|
||||
reg.producerController,
|
||||
s.producerController)
|
||||
retryTimer.start()
|
||||
reg.producerController ! ProducerController.RegisterConsumer(context.self)
|
||||
nextBehavior(s.copy(registering = Some(reg.producerController)))
|
||||
} else {
|
||||
|
|
@ -602,25 +649,17 @@ private class ConsumerControllerImpl[A](
|
|||
Behaviors.unhandled
|
||||
}
|
||||
|
||||
private def startRetryTimer(): Unit = {
|
||||
timers.startTimerWithFixedDelay(Retry, Retry, settings.resendInterval)
|
||||
}
|
||||
|
||||
// in case the Request or the SequencedMessage triggering the Request is lost
|
||||
private def retryRequest(s: State[A]): State[A] = {
|
||||
if (s.producerController == context.system.deadLetters) {
|
||||
s
|
||||
} else {
|
||||
// TODO #28720 Maybe try to adjust the retry frequency. Maybe some exponential backoff and less need for it when
|
||||
// SequenceMessage are arriving. On the other hand it might be too much overhead to reschedule of each
|
||||
// incoming SequenceMessage.
|
||||
val newRequestedSeqNr = if (resendLost) s.requestedSeqNr else s.receivedSeqNr + flowControlWindow / 2
|
||||
flightRecorder.consumerSentRequest(s.producerId, newRequestedSeqNr)
|
||||
context.log.debug(
|
||||
"Retry sending Request with confirmedSeqNr [{}], requestUpToSeqNr [{}].",
|
||||
s.confirmedSeqNr,
|
||||
newRequestedSeqNr)
|
||||
// TODO #28720 maybe watch the producer to avoid sending retry Request to dead producer
|
||||
s.producerController ! Request(s.confirmedSeqNr, newRequestedSeqNr, resendLost, viaTimeout = true)
|
||||
s.copy(requestedSeqNr = newRequestedSeqNr)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue