diff --git a/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala b/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala index 71db2ce11a..32395d3a62 100644 --- a/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala @@ -72,6 +72,10 @@ case class Ack(cumulativeAck: SeqNo, nacks: Set[SeqNo] = Set.empty) { class ResendBufferCapacityReachedException(c: Int) extends AkkaException(s"Resend buffer capacity of [$c] has been reached.") +class ResendUnfulfillableException + extends AkkaException("Unable to fulfill resend request since negatively acknowledged payload is no longer in buffer. " + + "The resend states between two systems are compromised and cannot be recovered.") + /** * Implements an immutable resend buffer that buffers messages until they have been acknowledged. Properly removes messages * when an ack is received. This buffer works together with [[akka.remote.AckedReceiveBuffer]] on the receiving end. @@ -94,9 +98,13 @@ case class AckedSendBuffer[T <: HasSequenceNumber]( * @param ack The received acknowledgement * @return An updated buffer containing the remaining unacknowledged messages */ - def acknowledge(ack: Ack): AckedSendBuffer[T] = this.copy( - nonAcked = nonAcked.filter { m ⇒ m.seq > ack.cumulativeAck }, - nacked = (nacked ++ nonAcked) filter { m ⇒ ack.nacks(m.seq) }) + def acknowledge(ack: Ack): AckedSendBuffer[T] = { + val newNacked = (nacked ++ nonAcked) filter { m ⇒ ack.nacks(m.seq) } + if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException + else this.copy( + nonAcked = nonAcked.filter { m ⇒ m.seq > ack.cumulativeAck }, + nacked = newNacked) + } /** * Puts a new message in the buffer. Throws [[java.lang.IllegalArgumentException]] if an out-of-sequence message diff --git a/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala index dbd743978a..68a4a38b30 100644 --- a/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/AckedDeliverySpec.scala @@ -169,6 +169,17 @@ class AckedDeliverySpec extends AkkaSpec { b7.nacked must be === Vector.empty } + "throw exception if non-buffered sequence number is NACKed" in { + val b0 = new AckedSendBuffer[Sequenced](10) + val msg1 = msg(1) + val msg2 = msg(2) + + val b1 = b0.buffer(msg1).buffer(msg2) + intercept[ResendUnfulfillableException] { + b1.acknowledge(Ack(SeqNo(2), nacks = Set(SeqNo(0)))) + } + } + } "ReceiveBuffer" must {