typo: resendFirsUnconfirmedIdleTimeout -> resendFirstUnconfirmedIdleTimeout (#31159)
This commit is contained in:
parent
61e44c3bad
commit
9f9bd354a4
3 changed files with 23 additions and 11 deletions
|
|
@ -200,9 +200,13 @@ object ShardingProducerController {
|
||||||
val bufferSize: Int,
|
val bufferSize: Int,
|
||||||
val internalAskTimeout: FiniteDuration,
|
val internalAskTimeout: FiniteDuration,
|
||||||
val cleanupUnusedAfter: FiniteDuration,
|
val cleanupUnusedAfter: FiniteDuration,
|
||||||
val resendFirsUnconfirmedIdleTimeout: FiniteDuration,
|
val resendFirstUnconfirmedIdleTimeout: FiniteDuration,
|
||||||
val producerControllerSettings: ProducerController.Settings) {
|
val producerControllerSettings: ProducerController.Settings) {
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
@deprecated("use resendFirstUnconfirmedIdleTimeout", "2.6.19")
|
||||||
|
def resendFirsUnconfirmedIdleTimeout: FiniteDuration = resendFirstUnconfirmedIdleTimeout
|
||||||
|
|
||||||
if (producerControllerSettings.chunkLargeMessagesBytes > 0)
|
if (producerControllerSettings.chunkLargeMessagesBytes > 0)
|
||||||
throw new IllegalArgumentException("Chunked messages not implemented for sharding yet.")
|
throw new IllegalArgumentException("Chunked messages not implemented for sharding yet.")
|
||||||
|
|
||||||
|
|
@ -221,11 +225,19 @@ object ShardingProducerController {
|
||||||
def withCleanupUnusedAfter(newCleanupUnusedAfter: java.time.Duration): Settings =
|
def withCleanupUnusedAfter(newCleanupUnusedAfter: java.time.Duration): Settings =
|
||||||
copy(cleanupUnusedAfter = newCleanupUnusedAfter.asScala)
|
copy(cleanupUnusedAfter = newCleanupUnusedAfter.asScala)
|
||||||
|
|
||||||
def withResendFirsUnconfirmedIdleTimeout(newResendFirsUnconfirmedIdleTimeout: FiniteDuration): Settings =
|
def withResendFirstUnconfirmedIdleTimeout(newResendFirstUnconfirmedIdleTimeout: FiniteDuration): Settings =
|
||||||
copy(resendFirsUnconfirmedIdleTimeout = newResendFirsUnconfirmedIdleTimeout)
|
copy(resendFirstUnconfirmedIdleTimeout = newResendFirstUnconfirmedIdleTimeout)
|
||||||
|
|
||||||
def withResendFirsUnconfirmedIdleTimeout(newResendFirsUnconfirmedIdleTimeout: java.time.Duration): Settings =
|
def withResendFirstUnconfirmedIdleTimeout(newResendFirstUnconfirmedIdleTimeout: java.time.Duration): Settings =
|
||||||
copy(resendFirsUnconfirmedIdleTimeout = newResendFirsUnconfirmedIdleTimeout.asScala)
|
copy(resendFirstUnconfirmedIdleTimeout = newResendFirstUnconfirmedIdleTimeout.asScala)
|
||||||
|
|
||||||
|
@deprecated("use resendFirstUnconfirmedIdleTimeout", "2.6.19")
|
||||||
|
def withResendFirsUnconfirmedIdleTimeout(newResendFirstUnconfirmedIdleTimeout: FiniteDuration): Settings =
|
||||||
|
copy(resendFirstUnconfirmedIdleTimeout = newResendFirstUnconfirmedIdleTimeout)
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
def withResendFirsUnconfirmedIdleTimeout(newResendFirstUnconfirmedIdleTimeout: java.time.Duration): Settings =
|
||||||
|
copy(resendFirstUnconfirmedIdleTimeout = newResendFirstUnconfirmedIdleTimeout.asScala)
|
||||||
|
|
||||||
def withProducerControllerSettings(newProducerControllerSettings: ProducerController.Settings): Settings =
|
def withProducerControllerSettings(newProducerControllerSettings: ProducerController.Settings): Settings =
|
||||||
copy(producerControllerSettings = newProducerControllerSettings)
|
copy(producerControllerSettings = newProducerControllerSettings)
|
||||||
|
|
@ -237,17 +249,17 @@ object ShardingProducerController {
|
||||||
bufferSize: Int = bufferSize,
|
bufferSize: Int = bufferSize,
|
||||||
internalAskTimeout: FiniteDuration = internalAskTimeout,
|
internalAskTimeout: FiniteDuration = internalAskTimeout,
|
||||||
cleanupUnusedAfter: FiniteDuration = cleanupUnusedAfter,
|
cleanupUnusedAfter: FiniteDuration = cleanupUnusedAfter,
|
||||||
resendFirsUnconfirmedIdleTimeout: FiniteDuration = resendFirsUnconfirmedIdleTimeout,
|
resendFirstUnconfirmedIdleTimeout: FiniteDuration = resendFirstUnconfirmedIdleTimeout,
|
||||||
producerControllerSettings: ProducerController.Settings = producerControllerSettings) =
|
producerControllerSettings: ProducerController.Settings = producerControllerSettings) =
|
||||||
new Settings(
|
new Settings(
|
||||||
bufferSize,
|
bufferSize,
|
||||||
internalAskTimeout,
|
internalAskTimeout,
|
||||||
cleanupUnusedAfter,
|
cleanupUnusedAfter,
|
||||||
resendFirsUnconfirmedIdleTimeout,
|
resendFirstUnconfirmedIdleTimeout,
|
||||||
producerControllerSettings)
|
producerControllerSettings)
|
||||||
|
|
||||||
override def toString: String =
|
override def toString: String =
|
||||||
s"Settings($bufferSize,$internalAskTimeout,$resendFirsUnconfirmedIdleTimeout,$producerControllerSettings)"
|
s"Settings($bufferSize,$internalAskTimeout,$resendFirstUnconfirmedIdleTimeout,$producerControllerSettings)"
|
||||||
}
|
}
|
||||||
|
|
||||||
def apply[A: ClassTag](
|
def apply[A: ClassTag](
|
||||||
|
|
|
||||||
|
|
@ -142,7 +142,7 @@ import akka.util.Timeout
|
||||||
def becomeActive(p: ActorRef[RequestNext[A]], s: DurableProducerQueue.State[A]): Behavior[InternalCommand] = {
|
def becomeActive(p: ActorRef[RequestNext[A]], s: DurableProducerQueue.State[A]): Behavior[InternalCommand] = {
|
||||||
Behaviors.withTimers { timers =>
|
Behaviors.withTimers { timers =>
|
||||||
timers.startTimerWithFixedDelay(CleanupUnused, settings.cleanupUnusedAfter / 2)
|
timers.startTimerWithFixedDelay(CleanupUnused, settings.cleanupUnusedAfter / 2)
|
||||||
timers.startTimerWithFixedDelay(ResendFirstUnconfirmed, settings.resendFirsUnconfirmedIdleTimeout / 2)
|
timers.startTimerWithFixedDelay(ResendFirstUnconfirmed, settings.resendFirstUnconfirmedIdleTimeout / 2)
|
||||||
|
|
||||||
// resend unconfirmed before other stashed messages
|
// resend unconfirmed before other stashed messages
|
||||||
Behaviors.withStash[InternalCommand](settings.bufferSize) { newStashBuffer =>
|
Behaviors.withStash[InternalCommand](settings.bufferSize) { newStashBuffer =>
|
||||||
|
|
@ -476,7 +476,7 @@ private class ShardingProducerControllerImpl[A: ClassTag](
|
||||||
s.out.foreach {
|
s.out.foreach {
|
||||||
case (outKey: OutKey, outState) =>
|
case (outKey: OutKey, outState) =>
|
||||||
val idleDurationMillis = (now - outState.usedNanoTime) / 1000 / 1000
|
val idleDurationMillis = (now - outState.usedNanoTime) / 1000 / 1000
|
||||||
if (outState.unconfirmed.nonEmpty && idleDurationMillis >= settings.resendFirsUnconfirmedIdleTimeout.toMillis) {
|
if (outState.unconfirmed.nonEmpty && idleDurationMillis >= settings.resendFirstUnconfirmedIdleTimeout.toMillis) {
|
||||||
context.log.debug(
|
context.log.debug(
|
||||||
"Resend first unconfirmed for [{}], because it was idle for [{} ms]",
|
"Resend first unconfirmed for [{}], because it was idle for [{} ms]",
|
||||||
outKey,
|
outKey,
|
||||||
|
|
|
||||||
|
|
@ -352,7 +352,7 @@ class ReliableDeliveryShardingSpec
|
||||||
}))
|
}))
|
||||||
|
|
||||||
val shardingProducerSettings =
|
val shardingProducerSettings =
|
||||||
ShardingProducerController.Settings(system).withResendFirsUnconfirmedIdleTimeout(1500.millis)
|
ShardingProducerController.Settings(system).withResendFirstUnconfirmedIdleTimeout(1500.millis)
|
||||||
val shardingProducerController =
|
val shardingProducerController =
|
||||||
spawn(
|
spawn(
|
||||||
ShardingProducerController[TestConsumer.Job](producerId, region, None, shardingProducerSettings),
|
ShardingProducerController[TestConsumer.Job](producerId, region, None, shardingProducerSettings),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue