diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala index 32f15a4677..6f3a62c14b 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala @@ -4,6 +4,8 @@ package akka.actor.typed.delivery +import scala.concurrent.duration._ + import org.scalatest.wordspec.AnyWordSpecLike import akka.actor.testkit.typed.scaladsl.LogCapturing @@ -12,12 +14,12 @@ import akka.actor.typed.delivery.ConsumerController.DeliverThenStop import akka.actor.typed.delivery.internal.ConsumerControllerImpl import akka.actor.typed.delivery.internal.ProducerControllerImpl -class ConsumerControllerSpec - extends ScalaTestWithActorTestKit(""" - akka.reliable-delivery.consumer-controller.flow-control-window = 20 - """) - with AnyWordSpecLike - with LogCapturing { +class ConsumerControllerSpec extends ScalaTestWithActorTestKit(""" + akka.reliable-delivery.consumer-controller { + flow-control-window = 20 + resend-interval-min = 1s + } + """) with AnyWordSpecLike with LogCapturing { import TestConsumer.sequencedMessage private var idCount = 0 @@ -62,6 +64,7 @@ class ConsumerControllerSpec val producerControllerProbe2 = createTestProbe[ProducerControllerImpl.InternalCommand]() consumerController ! ConsumerController.RegisterToProducerController(producerControllerProbe2.ref) producerControllerProbe2.expectMessage(ProducerController.RegisterConsumer(consumerController)) + consumerProbe.receiveMessage().confirmTo ! ConsumerController.Confirmed // expected resend producerControllerProbe2.expectMessage(ProducerController.RegisterConsumer(consumerController)) @@ -80,14 +83,12 @@ class ConsumerControllerSpec val consumerProbe = createTestProbe[ConsumerController.Delivery[TestConsumer.Job]]() consumerController ! ConsumerController.Start(consumerProbe.ref) - consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]] - producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, false)) - producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, true)) - consumerController ! ConsumerController.Confirmed producerControllerProbe.expectMessage(ProducerControllerImpl.Request(1, 20, true, false)) + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(1, 20, true, true)) + testKit.stop(consumerController) } @@ -185,15 +186,17 @@ class ConsumerControllerSpec consumerController ! sequencedMessage(producerId, 2, producerControllerProbe.ref) consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]] consumerController ! ConsumerController.Confirmed + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(2, 20, true, true)) consumerController ! sequencedMessage(producerId, 3, producerControllerProbe.ref) consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]].seqNr should ===(3) - - producerControllerProbe.expectMessage(ProducerControllerImpl.Request(2, 20, true, true)) - consumerController ! ConsumerController.Confirmed producerControllerProbe.expectMessage(ProducerControllerImpl.Request(3, 20, true, true)) + // exponential back, so now it should be more than 1 sec + producerControllerProbe.expectNoMessage(1.1.second) + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(3, 20, true, true)) + testKit.stop(consumerController) } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala index d09a8aa488..6fffc2bb2d 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala @@ -19,6 +19,7 @@ import akka.actor.typed.delivery.internal.ProducerControllerImpl class DurableProducerControllerSpec extends ScalaTestWithActorTestKit(""" akka.reliable-delivery.consumer-controller.flow-control-window = 20 + akka.reliable-delivery.consumer-controller.resend-interval-min = 1s """) with AnyWordSpecLike with LogCapturing { diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala index ff7839ae79..9ef9fd1fd9 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala @@ -42,12 +42,13 @@ object ReliableDeliveryRandomSpec { } } -class ReliableDeliveryRandomSpec - extends ScalaTestWithActorTestKit(""" - akka.reliable-delivery.consumer-controller.flow-control-window = 20 - """) - with AnyWordSpecLike - with LogCapturing { +class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit(""" + akka.reliable-delivery.consumer-controller { + flow-control-window = 20 + resend-interval-min = 500 ms + resend-interval-max = 2 s + } + """) with AnyWordSpecLike with LogCapturing { import ReliableDeliveryRandomSpec._ private var idCount = 0 diff --git a/akka-actor-typed/src/main/mima-filters/2.6.4.backwards.excludes/reliable-delivery.excludes b/akka-actor-typed/src/main/mima-filters/2.6.4.backwards.excludes/reliable-delivery.excludes index e59db0fb17..07a92c4b2e 100644 --- a/akka-actor-typed/src/main/mima-filters/2.6.4.backwards.excludes/reliable-delivery.excludes +++ b/akka-actor-typed/src/main/mima-filters/2.6.4.backwards.excludes/reliable-delivery.excludes @@ -1,2 +1,9 @@ # Changes to internals of reliable delivery ProblemFilters.exclude[Problem]("akka.actor.typed.delivery.internal.*") + +# #28720 Dynamically adjust the ConsumerController's resend interval +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.resendInterval") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.withResendInterval") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.withResendInterval") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.getResendInterval") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ConsumerController#Settings.this") diff --git a/akka-actor-typed/src/main/resources/reference.conf b/akka-actor-typed/src/main/resources/reference.conf index b3f085dfb5..f3f36a1f40 100644 --- a/akka-actor-typed/src/main/resources/reference.conf +++ b/akka-actor-typed/src/main/resources/reference.conf @@ -90,8 +90,10 @@ akka.reliable-delivery { flow-control-window = 50 # The ConsumerController resends flow control messages to the - # ProducerController with this interval. - resend-interval = 1s + # ProducerController with the resend-interval-min, and increasing + # it gradually to resend-interval-max when idle. + resend-interval-min = 2s + resend-interval-max = 30s # If this is enabled lost messages will not be resent, but flow control is used. # This can be more efficient since messages don't have to be diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala index 7b587fc84c..7df92fe216 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala @@ -162,7 +162,8 @@ object ConsumerController { def apply(config: Config): Settings = { new Settings( flowControlWindow = config.getInt("flow-control-window"), - resendInterval = config.getDuration("resend-interval").asScala, + resendIntervalMin = config.getDuration("resend-interval-min").asScala, + resendIntervalMax = config.getDuration("resend-interval-max").asScala, onlyFlowControl = config.getBoolean("only-flow-control")) } @@ -183,7 +184,8 @@ object ConsumerController { final class Settings private ( val flowControlWindow: Int, - val resendInterval: FiniteDuration, + val resendIntervalMin: FiniteDuration, + val resendIntervalMax: FiniteDuration, val onlyFlowControl: Boolean) { def withFlowControlWindow(newFlowControlWindow: Int): Settings = @@ -192,20 +194,32 @@ object ConsumerController { /** * Scala API */ - def withResendInterval(newResendInterval: FiniteDuration): Settings = - copy(resendInterval = newResendInterval) + def withResendIntervalMin(newResendIntervalMin: FiniteDuration): Settings = + copy(resendIntervalMin = newResendIntervalMin) + + /** + * Scala API + */ + def withResendIntervalMax(newResendIntervalMax: FiniteDuration): Settings = + copy(resendIntervalMax = newResendIntervalMax) /** * Java API */ - def withResendInterval(newResendInterval: JavaDuration): Settings = - copy(resendInterval = newResendInterval.asScala) + def withResendIntervalMin(newResendIntervalMin: JavaDuration): Settings = + copy(resendIntervalMin = newResendIntervalMin.asScala) /** * Java API */ - def getResendInterval(): JavaDuration = - resendInterval.asJava + def withResendIntervalMax(newResendIntervalMax: JavaDuration): Settings = + copy(resendIntervalMax = newResendIntervalMax.asScala) + + /** + * Java API + */ + def getResendIntervalMax(): JavaDuration = + resendIntervalMax.asJava def withOnlyFlowControl(newOnlyFlowControl: Boolean): Settings = copy(onlyFlowControl = newOnlyFlowControl) @@ -215,12 +229,13 @@ object ConsumerController { */ private def copy( flowControlWindow: Int = flowControlWindow, - resendInterval: FiniteDuration = resendInterval, + resendIntervalMin: FiniteDuration = resendIntervalMin, + resendIntervalMax: FiniteDuration = resendIntervalMax, onlyFlowControl: Boolean = onlyFlowControl) = - new Settings(flowControlWindow, resendInterval, onlyFlowControl) + new Settings(flowControlWindow, resendIntervalMin, resendIntervalMax, onlyFlowControl) override def toString: String = - s"Settings($flowControlWindow, $resendInterval, $onlyFlowControl)" + s"Settings($flowControlWindow, $resendIntervalMin, $onlyFlowControl)" } def apply[A](): Behavior[Command[A]] = diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala index 1649ef6bcb..362b6efc87 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala @@ -4,6 +4,8 @@ package akka.actor.typed.delivery.internal +import scala.concurrent.duration.FiniteDuration + import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.PostStop @@ -128,8 +130,9 @@ import akka.util.ConstantFun.scalaIdentityFunction context.watchWith(s.deliverTo, ConsumerTerminated(s.deliverTo)) flightRecorder.consumerStarted(context.self.path) + val retryTimer = new RetryTimer(timers, settings.resendIntervalMin, settings.resendIntervalMax) val activeBehavior = - new ConsumerControllerImpl[A](context, timers, stashBuffer, settings) + new ConsumerControllerImpl[A](context, retryTimer, stashBuffer, settings) .active(initialState(context, s, registering, stopping)) context.log.debug("Received Start, unstash [{}] messages.", stashBuffer.size) stashBuffer.unstash(activeBehavior, 1, scalaIdentityFunction) @@ -171,7 +174,7 @@ import akka.util.ConstantFun.scalaIdentityFunction } - timers.startTimerWithFixedDelay(Retry, Retry, settings.resendInterval) + timers.startTimerWithFixedDelay(Retry, Retry, settings.resendIntervalMin) waitForStart(None, stopping = false) } } @@ -207,11 +210,47 @@ import akka.util.ConstantFun.scalaIdentityFunction if (ref.path.address.hasGlobalScope) throw new IllegalArgumentException(s"Consumer [$ref] should be local.") } + + private class RetryTimer( + timers: TimerScheduler[ConsumerControllerImpl.InternalCommand], + val minBackoff: FiniteDuration, + maxBackoff: FiniteDuration) { + private var _interval = minBackoff + + def interval(): FiniteDuration = + _interval + + def start(): Unit = { + _interval = minBackoff + timers.startTimerWithFixedDelay(Retry, _interval) + } + + def scheduleNext(): Unit = { + val newInterval = + if (_interval eq maxBackoff) + maxBackoff + else + maxBackoff.min(_interval * 1.5) match { + case f: FiniteDuration => f + case _ => maxBackoff + } + if (newInterval != _interval) { + _interval = newInterval + timers.startTimerWithFixedDelay(Retry, _interval) + } + } + + def reset(): Unit = { + if (_interval ne minBackoff) + start() + } + + } } private class ConsumerControllerImpl[A]( context: ActorContext[ConsumerControllerImpl.InternalCommand], - timers: TimerScheduler[ConsumerControllerImpl.InternalCommand], + retryTimer: ConsumerControllerImpl.RetryTimer, stashBuffer: StashBuffer[ConsumerControllerImpl.InternalCommand], settings: ConsumerController.Settings) { @@ -230,7 +269,7 @@ private class ConsumerControllerImpl[A]( private val traceEnabled = context.log.isTraceEnabled - startRetryTimer() + retryTimer.start() private def resendLost = !settings.onlyFlowControl @@ -245,6 +284,7 @@ private class ConsumerControllerImpl[A]( val expectedSeqNr = s.receivedSeqNr + 1 flightRecorder.consumerReceived(pid, seqNr) + retryTimer.reset() if (s.isProducerChanged(seqMsg)) { if (seqMsg.first && traceEnabled) @@ -269,6 +309,7 @@ private class ConsumerControllerImpl[A]( if (resendLost) { seqMsg.producerController ! Resend(fromSeqNr = expectedSeqNr) stashBuffer.clear() + retryTimer.start() resending(s) } else { deliver(s.copy(receivedSeqNr = seqNr), seqMsg) @@ -336,6 +377,7 @@ private class ConsumerControllerImpl[A]( // request resend of all unconfirmed, and mark first seqMsg.producerController ! Resend(0) stashBuffer.clear() + retryTimer.start() resending(s) } else { context.log.warnN( @@ -469,7 +511,7 @@ private class ConsumerControllerImpl[A]( seqNr, newRequestedSeqNr) s.producerController ! Request(confirmedSeqNr = seqNr, newRequestedSeqNr, resendLost, viaTimeout = false) - startRetryTimer() // reset interval since Request was just sent + retryTimer.start() // reset interval since Request was just sent newRequestedSeqNr } else { if (seqMsg.ack) { @@ -518,7 +560,8 @@ private class ConsumerControllerImpl[A]( Behaviors.same case Retry => - receiveRetry(s, () => waitingForConfirmation(retryRequest(s), seqMsg)) + // no retries when waitingForConfirmation, will be performed from (idle) active + Behaviors.same case start: Start[A] => start.deliverTo ! Delivery(seqMsg.message, context.self, seqMsg.producerId, seqMsg.seqNr) @@ -542,6 +585,9 @@ private class ConsumerControllerImpl[A]( } private def receiveRetry(s: State[A], nextBehavior: () => Behavior[InternalCommand]): Behavior[InternalCommand] = { + retryTimer.scheduleNext() + if (retryTimer.interval() != retryTimer.minBackoff) + context.log.debug("Schedule next retry in [{} ms]", retryTimer.interval().toMillis) s.registering match { case None => nextBehavior() case Some(reg) => @@ -574,6 +620,7 @@ private class ConsumerControllerImpl[A]( "Register to new ProducerController [{}], previous was [{}].", reg.producerController, s.producerController) + retryTimer.start() reg.producerController ! ProducerController.RegisterConsumer(context.self) nextBehavior(s.copy(registering = Some(reg.producerController))) } else { @@ -602,25 +649,17 @@ private class ConsumerControllerImpl[A]( Behaviors.unhandled } - private def startRetryTimer(): Unit = { - timers.startTimerWithFixedDelay(Retry, Retry, settings.resendInterval) - } - // in case the Request or the SequencedMessage triggering the Request is lost private def retryRequest(s: State[A]): State[A] = { if (s.producerController == context.system.deadLetters) { s } else { - // TODO #28720 Maybe try to adjust the retry frequency. Maybe some exponential backoff and less need for it when - // SequenceMessage are arriving. On the other hand it might be too much overhead to reschedule of each - // incoming SequenceMessage. val newRequestedSeqNr = if (resendLost) s.requestedSeqNr else s.receivedSeqNr + flowControlWindow / 2 flightRecorder.consumerSentRequest(s.producerId, newRequestedSeqNr) context.log.debug( "Retry sending Request with confirmedSeqNr [{}], requestUpToSeqNr [{}].", s.confirmedSeqNr, newRequestedSeqNr) - // TODO #28720 maybe watch the producer to avoid sending retry Request to dead producer s.producerController ! Request(s.confirmedSeqNr, newRequestedSeqNr, resendLost, viaTimeout = true) s.copy(requestedSeqNr = newRequestedSeqNr) }