=per #15590 Increase efforts to in-order on start of ALOD

AtLeastOnceDelivery can delivery out-of-order, and that's OK.
Although, in the case of message replay followed by taking user land
commands which may trigger `deliver` calls, it is nicer to at least once
try to send the replayed but not confirmed deliveries *first*, before
sending the completely new deliveries.

This change acomplishes this by triggering redelivery explicitly when
recovery has finished, and setting the timestamps on these messages a
bit in the past, so they hit their redelivery deadline right away during
this recovery induced redelivery.

Resolves #15590
This commit is contained in:
Konrad 'ktoso' Malawski 2014-07-29 17:28:42 +02:00
parent be88399ad5
commit fdb0d9ee3b
2 changed files with 52 additions and 5 deletions

View file

@ -3,6 +3,8 @@
*/
package akka.persistence
import akka.persistence.JournalProtocol.ReplayMessagesSuccess
import scala.annotation.tailrec
import scala.collection.breakOut
import scala.collection.immutable
@ -189,8 +191,9 @@ trait AtLeastOnceDelivery extends Processor {
s"Too many unconfirmed messages, maximum allowed is [$maxUnconfirmedMessages]")
val deliveryId = nextDeliverySequenceNr()
val now = System.nanoTime()
val now = if (recoveryRunning) { System.nanoTime() - redeliverInterval.toNanos } else System.nanoTime()
val d = Delivery(destination, deliveryIdToMessage(deliveryId), now, attempt = 0)
if (recoveryRunning)
unconfirmed = unconfirmed.updated(deliveryId, d)
else
@ -219,14 +222,17 @@ trait AtLeastOnceDelivery extends Processor {
val now = System.nanoTime()
val deadline = now - redeliverInterval.toNanos
var warnings = Vector.empty[UnconfirmedDelivery]
unconfirmed foreach {
case (deliveryId, delivery)
if (delivery.timestamp <= deadline) {
send(deliveryId, delivery, now)
if (delivery.attempt == warnAfterNumberOfUnconfirmedAttempts)
warnings :+= UnconfirmedDelivery(deliveryId, delivery.destination, delivery.message)
}
}
if (warnings.nonEmpty)
self ! UnconfirmedWarning(warnings)
}
@ -283,8 +289,15 @@ trait AtLeastOnceDelivery extends Processor {
*/
override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit =
message match {
case RedeliveryTick redeliverOverdue()
case _ super.aroundReceive(receive, message)
case ReplayMessagesSuccess
redeliverOverdue()
super.aroundReceive(receive, message)
case RedeliveryTick
redeliverOverdue()
case x
super.aroundReceive(receive, message)
}
}

View file

@ -218,6 +218,41 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
probeA.expectNoMsg(1.second)
}
"re-send replayed deliveries with an 'initially in-order' strategy, before delivering fresh messages" in {
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(2, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
snd ! Req("a-2")
expectMsg(ReqAck)
// a-2 was lost
snd ! Req("a-3")
expectMsg(ReqAck)
probeA.expectMsg(Action(3, "a-3"))
snd ! Req("a-4")
expectMsg(ReqAck)
// a-4 was lost
// trigger restart
snd ! Boom
snd ! Req("a-5")
expectMsg(ReqAck)
// and then re-delivered
probeA.expectMsg(Action(2, "a-2")) // re-delivered
// a-4 was re-delivered but lost
probeA.expectMsg(Action(5, "a-5")) // re-delivered
probeA.expectMsg(Action(4, "a-4")) // re-delivered, 3rd time
probeA.expectNoMsg(1.second)
}
"restore state from snapshot" in {
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
@ -307,8 +342,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(
// TODO disable debug logging once happy with stability of this test
ConfigFactory.parseString("""akka.logLevel = DEBUG""") withFallback PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec")
)
ConfigFactory.parseString("""akka.logLevel = DEBUG""") withFallback PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec"))
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class InmemAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("inmem", "AtLeastOnceDeliverySpec"))