From 57f67bc58c5b93d62cdd025969c3e6117b8b0399 Mon Sep 17 00:00:00 2001 From: adamw Date: Tue, 25 Nov 2014 11:34:43 +0100 Subject: [PATCH] +per #16348 Limit the number of messages redelivered at each interval Helps to prevent flooding destinations which are unavailable for a long time with messages once they become available. --- akka-docs/rst/java/persistence.rst | 7 +++ akka-docs/rst/scala/persistence.rst | 7 +++ .../src/main/resources/reference.conf | 3 ++ .../persistence/AtLeastOnceDelivery.scala | 28 +++++++++-- .../scala/akka/persistence/Persistence.scala | 3 ++ .../persistence/AtLeastOnceDeliverySpec.scala | 46 +++++++++++++++---- 6 files changed, 80 insertions(+), 14 deletions(-) diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index e728e15f4c..b41ec9fb97 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -448,6 +448,13 @@ The interval between redelivery attempts is defined by the ``redeliverInterval`` The default value can be configured with the ``akka.persistence.at-least-once-delivery.redeliver-interval`` configuration key. The method can be overridden by implementation classes to return non-default values. +The maximum number of messages that will be sent at each redelivery burst is defined by the +``redeliveryBurstLimit`` method (burst frequency is half of the redelivery interval). If there's a lot of +unconfirmed messages (e.g. if the destination is not available for a long time), this helps to prevent an overwhelming +amount of messages to be sent at once. The default value can be configured with the +``akka.persistence.at-least-once-delivery.redelivery-burst-limit`` configuration key. The method can be overridden +by implementation classes to return non-default values. + After a number of delivery attempts a ``AtLeastOnceDelivery.UnconfirmedWarning`` message will be sent to ``self``. The re-sending will still continue, but you can choose to call ``confirmDelivery`` to cancel the re-sending. The number of delivery attempts before emitting the diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 0db7cc772f..f4c851ce40 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -446,6 +446,13 @@ The interval between redelivery attempts is defined by the ``redeliverInterval`` The default value can be configured with the ``akka.persistence.at-least-once-delivery.redeliver-interval`` configuration key. The method can be overridden by implementation classes to return non-default values. +The maximum number of messages that will be sent at each redelivery burst is defined by the +``redeliveryBurstLimit`` method (burst frequency is half of the redelivery interval). If there's a lot of +unconfirmed messages (e.g. if the destination is not available for a long time), this helps to prevent an overwhelming +amount of messages to be sent at once. The default value can be configured with the +``akka.persistence.at-least-once-delivery.redelivery-burst-limit`` configuration key. The method can be overridden +by implementation classes to return non-default values. + After a number of delivery attempts a ``AtLeastOnceDelivery.UnconfirmedWarning`` message will be sent to ``self``. The re-sending will still continue, but you can choose to call ``confirmDelivery`` to cancel the re-sending. The number of delivery attempts before emitting the diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 9a4228c8b2..9de5404a3e 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -146,6 +146,9 @@ akka { at-least-once-delivery { # Interval between redelivery attempts redeliver-interval = 5s + + # Maximum number of unconfirmed messages that will be sent in one redelivery burst + redelivery-burst-limit = 10000 # After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning` # message will be sent to the actor. diff --git a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala index 2fac68cdff..95ec317e8a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/AtLeastOnceDelivery.scala @@ -122,6 +122,22 @@ trait AtLeastOnceDelivery extends Processor { private val defaultRedeliverInterval: FiniteDuration = Persistence(context.system).settings.atLeastOnceDelivery.redeliverInterval + /** + * Maximum number of unconfirmed messages that will be sent at each redelivery burst + * (burst frequency is half of the redelivery interval). + * If there's a lot of unconfirmed messages (e.g. if the destination is not available for a long time), + * this helps to prevent an overwhelming amount of messages to be sent at once. + * + * The default value can be configured with the + * `akka.persistence.at-least-once-delivery.redelivery-burst-limit` + * configuration key. This method can be overridden by implementation classes to return + * non-default values. + */ + def redeliveryBurstLimit: Int = defaultRedeliveryBurstLimit + + private val defaultRedeliveryBurstLimit: Int = + Persistence(context.system).settings.atLeastOnceDelivery.redeliveryBurstLimit + /** * After this number of delivery attempts a [[AtLeastOnceDelivery.UnconfirmedWarning]] message * will be sent to `self`. The count is reset after a restart. @@ -223,15 +239,17 @@ trait AtLeastOnceDelivery extends Processor { val deadline = now - redeliverInterval.toNanos var warnings = Vector.empty[UnconfirmedDelivery] - unconfirmed foreach { - case (deliveryId, delivery) ⇒ - if (delivery.timestamp <= deadline) { + unconfirmed + .iterator + .filter { case (_, delivery) ⇒ delivery.timestamp <= deadline } + .take(redeliveryBurstLimit) + .foreach { + case (deliveryId, delivery) ⇒ send(deliveryId, delivery, now) if (delivery.attempt == warnAfterNumberOfUnconfirmedAttempts) warnings :+= UnconfirmedDelivery(deliveryId, delivery.destination, delivery.message) - } - } + } if (warnings.nonEmpty) self ! UnconfirmedWarning(warnings) diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 245dc2f68b..b341d605f0 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -47,6 +47,9 @@ final class PersistenceSettings(config: Config) { val redeliverInterval: FiniteDuration = config.getMillisDuration("at-least-once-delivery.redeliver-interval") + val redeliveryBurstLimit: Int = + config.getInt("at-least-once-delivery.redelivery-burst-limit") + val warnAfterNumberOfUnconfirmedAttempts: Int = config.getInt("at-least-once-delivery.warn-after-number-of-unconfirmed-attempts") diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala index c28a0a4b61..29736a8fc6 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala @@ -29,13 +29,15 @@ object AtLeastOnceDeliverySpec { def senderProps(testActor: ActorRef, name: String, redeliverInterval: FiniteDuration, warnAfterNumberOfUnconfirmedAttempts: Int, - async: Boolean, destinations: Map[String, ActorPath]): Props = - Props(new Sender(testActor, name, redeliverInterval, warnAfterNumberOfUnconfirmedAttempts, async, destinations)) + redeliveryBurstLimit: Int, async: Boolean, destinations: Map[String, ActorPath]): Props = + Props(new Sender(testActor, name, redeliverInterval, warnAfterNumberOfUnconfirmedAttempts, + redeliveryBurstLimit, async, destinations)) class Sender(testActor: ActorRef, name: String, override val redeliverInterval: FiniteDuration, override val warnAfterNumberOfUnconfirmedAttempts: Int, + override val redeliveryBurstLimit: Int, async: Boolean, destinations: Map[String, ActorPath]) extends PersistentActor with AtLeastOnceDelivery with ActorLogging { @@ -154,7 +156,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) "deliver messages in order when nothing is lost" in { val probeA = TestProbe() val destinations = Map("A" -> system.actorOf(destinationProps(probeA.ref)).path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name) + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name) snd ! Req("a") expectMsg(ReqAck) probeA.expectMsg(Action(1, "a")) @@ -165,7 +167,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name) + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name) snd ! Req("a-1") expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) @@ -189,7 +191,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name) + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name) snd ! Req("a-1") expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) @@ -222,7 +224,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name) + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 1000, async = false, destinations), name) snd ! Req("a-1") expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) @@ -257,7 +259,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) - val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, async = false, destinations), name) + val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, 1000, async = false, destinations), name) snd ! Req("a-1") expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) @@ -294,7 +296,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) val probeA = TestProbe() val probeB = TestProbe() val destinations = Map("A" -> probeA.ref.path, "B" -> probeB.ref.path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 3, async = false, destinations), name) + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 3, 1000, async = false, destinations), name) snd ! Req("a-1") snd ! Req("b-1") snd ! Req("b-2") @@ -320,7 +322,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) "A" -> system.actorOf(unreliableProps(2, dstA), "unreliable-a").path, "B" -> system.actorOf(unreliableProps(5, dstB), "unreliable-b").path, "C" -> system.actorOf(unreliableProps(3, dstC), "unreliable-c").path) - val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, async = true, destinations), name) + val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, 1000, async = true, destinations), name) val N = 100 for (n ← 1 to N) { snd ! Req("a-" + n) @@ -337,6 +339,32 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) probeC.receiveN(N, deliverWithin).map { case a: Action ⇒ a.payload }.toSet should be((1 to N).map(n ⇒ "c-" + n).toSet) } + "limit the number of messages redelivered at once" in { + val probeA = TestProbe() + val dst = system.actorOf(destinationProps(probeA.ref)) + val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path) + + val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, 2, async = true, destinations), name) + + val N = 10 + for (n ← 1 to N) { + snd ! Req("a-" + n) + } + + // initially all odd messages should go through + for (n ← 1 to N if n % 2 == 1) probeA.expectMsg(Action(n, s"a-$n")) + probeA.expectNoMsg(100.millis) + + // at each redelivery round, 2 (even) messages are sent, the first goes through + // without throttling, at each round half of the messages would go through + var toDeliver = (1 to N).filter(_ % 2 == 0).map(_.toLong).toSet + for (n ← 1 to N if n % 2 == 0) { + toDeliver -= probeA.expectMsgType[Action].id + probeA.expectNoMsg(100.millis) + } + + toDeliver should be(Set.empty) + } } }