+per #16348 Limit the number of messages redelivered at each interval
Helps to prevent flooding destinations which are unavailable for a long time with messages once they become available.
This commit is contained in:
parent
c73603f707
commit
57f67bc58c
6 changed files with 80 additions and 14 deletions
|
|
@ -448,6 +448,13 @@ The interval between redelivery attempts is defined by the ``redeliverInterval``
|
|||
The default value can be configured with the ``akka.persistence.at-least-once-delivery.redeliver-interval``
|
||||
configuration key. The method can be overridden by implementation classes to return non-default values.
|
||||
|
||||
The maximum number of messages that will be sent at each redelivery burst is defined by the
|
||||
``redeliveryBurstLimit`` method (burst frequency is half of the redelivery interval). If there's a lot of
|
||||
unconfirmed messages (e.g. if the destination is not available for a long time), this helps to prevent an overwhelming
|
||||
amount of messages to be sent at once. The default value can be configured with the
|
||||
``akka.persistence.at-least-once-delivery.redelivery-burst-limit`` configuration key. The method can be overridden
|
||||
by implementation classes to return non-default values.
|
||||
|
||||
After a number of delivery attempts a ``AtLeastOnceDelivery.UnconfirmedWarning`` message
|
||||
will be sent to ``self``. The re-sending will still continue, but you can choose to call
|
||||
``confirmDelivery`` to cancel the re-sending. The number of delivery attempts before emitting the
|
||||
|
|
|
|||
|
|
@ -446,6 +446,13 @@ The interval between redelivery attempts is defined by the ``redeliverInterval``
|
|||
The default value can be configured with the ``akka.persistence.at-least-once-delivery.redeliver-interval``
|
||||
configuration key. The method can be overridden by implementation classes to return non-default values.
|
||||
|
||||
The maximum number of messages that will be sent at each redelivery burst is defined by the
|
||||
``redeliveryBurstLimit`` method (burst frequency is half of the redelivery interval). If there's a lot of
|
||||
unconfirmed messages (e.g. if the destination is not available for a long time), this helps to prevent an overwhelming
|
||||
amount of messages to be sent at once. The default value can be configured with the
|
||||
``akka.persistence.at-least-once-delivery.redelivery-burst-limit`` configuration key. The method can be overridden
|
||||
by implementation classes to return non-default values.
|
||||
|
||||
After a number of delivery attempts a ``AtLeastOnceDelivery.UnconfirmedWarning`` message
|
||||
will be sent to ``self``. The re-sending will still continue, but you can choose to call
|
||||
``confirmDelivery`` to cancel the re-sending. The number of delivery attempts before emitting the
|
||||
|
|
|
|||
|
|
@ -146,6 +146,9 @@ akka {
|
|||
at-least-once-delivery {
|
||||
# Interval between redelivery attempts
|
||||
redeliver-interval = 5s
|
||||
|
||||
# Maximum number of unconfirmed messages that will be sent in one redelivery burst
|
||||
redelivery-burst-limit = 10000
|
||||
|
||||
# After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning`
|
||||
# message will be sent to the actor.
|
||||
|
|
|
|||
|
|
@ -122,6 +122,22 @@ trait AtLeastOnceDelivery extends Processor {
|
|||
private val defaultRedeliverInterval: FiniteDuration =
|
||||
Persistence(context.system).settings.atLeastOnceDelivery.redeliverInterval
|
||||
|
||||
/**
|
||||
* Maximum number of unconfirmed messages that will be sent at each redelivery burst
|
||||
* (burst frequency is half of the redelivery interval).
|
||||
* If there's a lot of unconfirmed messages (e.g. if the destination is not available for a long time),
|
||||
* this helps to prevent an overwhelming amount of messages to be sent at once.
|
||||
*
|
||||
* The default value can be configured with the
|
||||
* `akka.persistence.at-least-once-delivery.redelivery-burst-limit`
|
||||
* configuration key. This method can be overridden by implementation classes to return
|
||||
* non-default values.
|
||||
*/
|
||||
def redeliveryBurstLimit: Int = defaultRedeliveryBurstLimit
|
||||
|
||||
private val defaultRedeliveryBurstLimit: Int =
|
||||
Persistence(context.system).settings.atLeastOnceDelivery.redeliveryBurstLimit
|
||||
|
||||
/**
|
||||
* After this number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message
|
||||
* will be sent to `self`. The count is reset after a restart.
|
||||
|
|
@ -223,15 +239,17 @@ trait AtLeastOnceDelivery extends Processor {
|
|||
val deadline = now - redeliverInterval.toNanos
|
||||
var warnings = Vector.empty[UnconfirmedDelivery]
|
||||
|
||||
unconfirmed foreach {
|
||||
case (deliveryId, delivery) ⇒
|
||||
if (delivery.timestamp <= deadline) {
|
||||
unconfirmed
|
||||
.iterator
|
||||
.filter { case (_, delivery) ⇒ delivery.timestamp <= deadline }
|
||||
.take(redeliveryBurstLimit)
|
||||
.foreach {
|
||||
case (deliveryId, delivery) ⇒
|
||||
send(deliveryId, delivery, now)
|
||||
|
||||
if (delivery.attempt == warnAfterNumberOfUnconfirmedAttempts)
|
||||
warnings :+= UnconfirmedDelivery(deliveryId, delivery.destination, delivery.message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (warnings.nonEmpty)
|
||||
self ! UnconfirmedWarning(warnings)
|
||||
|
|
|
|||
|
|
@ -47,6 +47,9 @@ final class PersistenceSettings(config: Config) {
|
|||
val redeliverInterval: FiniteDuration =
|
||||
config.getMillisDuration("at-least-once-delivery.redeliver-interval")
|
||||
|
||||
val redeliveryBurstLimit: Int =
|
||||
config.getInt("at-least-once-delivery.redelivery-burst-limit")
|
||||
|
||||
val warnAfterNumberOfUnconfirmedAttempts: Int =
|
||||
config.getInt("at-least-once-delivery.warn-after-number-of-unconfirmed-attempts")
|
||||
|
||||
|
|
|
|||
|
|
@ -29,13 +29,15 @@ object AtLeastOnceDeliverySpec {
|
|||
|
||||
def senderProps(testActor: ActorRef, name: String,
|
||||
redeliverInterval: FiniteDuration, warnAfterNumberOfUnconfirmedAttempts: Int,
|
||||
async: Boolean, destinations: Map[String, ActorPath]): Props =
|
||||
Props(new Sender(testActor, name, redeliverInterval, warnAfterNumberOfUnconfirmedAttempts, async, destinations))
|
||||
redeliveryBurstLimit: Int, async: Boolean, destinations: Map[String, ActorPath]): Props =
|
||||
Props(new Sender(testActor, name, redeliverInterval, warnAfterNumberOfUnconfirmedAttempts,
|
||||
redeliveryBurstLimit, async, destinations))
|
||||
|
||||
class Sender(testActor: ActorRef,
|
||||
name: String,
|
||||
override val redeliverInterval: FiniteDuration,
|
||||
override val warnAfterNumberOfUnconfirmedAttempts: Int,
|
||||
override val redeliveryBurstLimit: Int,
|
||||
async: Boolean,
|
||||
destinations: Map[String, ActorPath])
|
||||
extends PersistentActor with AtLeastOnceDelivery with ActorLogging {
|
||||
|
|
@ -154,7 +156,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
|
|||
"deliver messages in order when nothing is lost" in {
|
||||
val probeA = TestProbe()
|
||||
val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name)
|
||||
snd ! Req("a")
|
||||
expectMsg(ReqAck)
|
||||
probeA.expectMsg(Action(1, "a"))
|
||||
|
|
@ -165,7 +167,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
|
|||
val probeA = TestProbe()
|
||||
val dst = system.actorOf(destinationProps(probeA.ref))
|
||||
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name)
|
||||
snd ! Req("a-1")
|
||||
expectMsg(ReqAck)
|
||||
probeA.expectMsg(Action(1, "a-1"))
|
||||
|
|
@ -189,7 +191,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
|
|||
val probeA = TestProbe()
|
||||
val dst = system.actorOf(destinationProps(probeA.ref))
|
||||
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name)
|
||||
snd ! Req("a-1")
|
||||
expectMsg(ReqAck)
|
||||
probeA.expectMsg(Action(1, "a-1"))
|
||||
|
|
@ -222,7 +224,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
|
|||
val probeA = TestProbe()
|
||||
val dst = system.actorOf(destinationProps(probeA.ref))
|
||||
val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name)
|
||||
snd ! Req("a-1")
|
||||
expectMsg(ReqAck)
|
||||
probeA.expectMsg(Action(1, "a-1"))
|
||||
|
|
@ -257,7 +259,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
|
|||
val probeA = TestProbe()
|
||||
val dst = system.actorOf(destinationProps(probeA.ref))
|
||||
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, async = false, destinations), name)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, 1000, async = false, destinations), name)
|
||||
snd ! Req("a-1")
|
||||
expectMsg(ReqAck)
|
||||
probeA.expectMsg(Action(1, "a-1"))
|
||||
|
|
@ -294,7 +296,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
|
|||
val probeA = TestProbe()
|
||||
val probeB = TestProbe()
|
||||
val destinations = Map("A" -> probeA.ref.path, "B" -> probeB.ref.path)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 3, async = false, destinations), name)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 3, 1000, async = false, destinations), name)
|
||||
snd ! Req("a-1")
|
||||
snd ! Req("b-1")
|
||||
snd ! Req("b-2")
|
||||
|
|
@ -320,7 +322,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
|
|||
"A" -> system.actorOf(unreliableProps(2, dstA), "unreliable-a").path,
|
||||
"B" -> system.actorOf(unreliableProps(5, dstB), "unreliable-b").path,
|
||||
"C" -> system.actorOf(unreliableProps(3, dstC), "unreliable-c").path)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, async = true, destinations), name)
|
||||
val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, 1000, async = true, destinations), name)
|
||||
val N = 100
|
||||
for (n ← 1 to N) {
|
||||
snd ! Req("a-" + n)
|
||||
|
|
@ -337,6 +339,32 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
|
|||
probeC.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should be((1 to N).map(n ⇒ "c-" + n).toSet)
|
||||
}
|
||||
|
||||
"limit the number of messages redelivered at once" in {
|
||||
val probeA = TestProbe()
|
||||
val dst = system.actorOf(destinationProps(probeA.ref))
|
||||
val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path)
|
||||
|
||||
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 2, async = true, destinations), name)
|
||||
|
||||
val N = 10
|
||||
for (n ← 1 to N) {
|
||||
snd ! Req("a-" + n)
|
||||
}
|
||||
|
||||
// initially all odd messages should go through
|
||||
for (n ← 1 to N if n % 2 == 1) probeA.expectMsg(Action(n, s"a-$n"))
|
||||
probeA.expectNoMsg(100.millis)
|
||||
|
||||
// at each redelivery round, 2 (even) messages are sent, the first goes through
|
||||
// without throttling, at each round half of the messages would go through
|
||||
var toDeliver = (1 to N).filter(_ % 2 == 0).map(_.toLong).toSet
|
||||
for (n ← 1 to N if n % 2 == 0) {
|
||||
toDeliver -= probeA.expectMsgType[Action].id
|
||||
probeA.expectNoMsg(100.millis)
|
||||
}
|
||||
|
||||
toDeliver should be(Set.empty)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue