=rem #3527 Fail fatally when negative acknowledgement is unfulfillable

This commit is contained in:
Endre Sándor Varga 2013-08-19 16:50:41 +02:00 committed by Patrik Nordwall
parent cc15919512
commit d85039c1a6
2 changed files with 22 additions and 3 deletions

View file

@ -72,6 +72,10 @@ case class Ack(cumulativeAck: SeqNo, nacks: Set[SeqNo] = Set.empty) {
class ResendBufferCapacityReachedException(c: Int)
extends AkkaException(s"Resend buffer capacity of [$c] has been reached.")
class ResendUnfulfillableException
extends AkkaException("Unable to fulfill resend request since negatively acknowledged payload is no longer in buffer. " +
"The resend states between two systems are compromised and cannot be recovered.")
/**
* Implements an immutable resend buffer that buffers messages until they have been acknowledged. Properly removes messages
* when an ack is received. This buffer works together with [[akka.remote.AckedReceiveBuffer]] on the receiving end.
@ -94,9 +98,13 @@ case class AckedSendBuffer[T <: HasSequenceNumber](
* @param ack The received acknowledgement
* @return An updated buffer containing the remaining unacknowledged messages
*/
def acknowledge(ack: Ack): AckedSendBuffer[T] = this.copy(
nonAcked = nonAcked.filter { m m.seq > ack.cumulativeAck },
nacked = (nacked ++ nonAcked) filter { m ack.nacks(m.seq) })
def acknowledge(ack: Ack): AckedSendBuffer[T] = {
val newNacked = (nacked ++ nonAcked) filter { m ack.nacks(m.seq) }
if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException
else this.copy(
nonAcked = nonAcked.filter { m m.seq > ack.cumulativeAck },
nacked = newNacked)
}
/**
* Puts a new message in the buffer. Throws [[java.lang.IllegalArgumentException]] if an out-of-sequence message

View file

@ -169,6 +169,17 @@ class AckedDeliverySpec extends AkkaSpec {
b7.nacked must be === Vector.empty
}
"throw exception if non-buffered sequence number is NACKed" in {
val b0 = new AckedSendBuffer[Sequenced](10)
val msg1 = msg(1)
val msg2 = msg(2)
val b1 = b0.buffer(msg1).buffer(msg2)
intercept[ResendUnfulfillableException] {
b1.acknowledge(Ack(SeqNo(2), nacks = Set(SeqNo(0))))
}
}
}
"ReceiveBuffer" must {