=rem #17554 Improve flow control of system message delivery

When watching many (5000) actors at the same time the
following problems were found:

* first send of a sys msg is sent without any flow control
  => limit the number of outstanding sys msg by using
     the buffer to send them later (ordinary resend)
* when msg cannot be written sys msg is dropped (relying on resend),
  but that cause message re-ordering and negative acknowledgment,
  which is very costly
  => buffer the sys msg on write failure
  => minor optimization of AckedReceiveBuffer

I also made the resend-limit configurable.

(cherry picked from commit ecfc271e9a9d7efcf76945632d89c78740291cc6)
This commit is contained in:
Patrik Nordwall 2015-05-27 14:22:16 +02:00
parent 5fe64ecd4a
commit dec53381b6
6 changed files with 27 additions and 14 deletions

View file

@ -267,6 +267,14 @@ akka {
# Messages that were negatively acknowledged are always immediately
# resent.
resend-interval = 2 s
# Maximum number of unacknowledged system messages that will be resent
# each 'resend-interval'. If you watch many (> 1000) remote actors you can
# increase this value to for example 600, but a too large limit (e.g. 10000)
# may flood the connection and might cause false failure detection to trigger.
# Test such a configuration by watching all actors at the same time and stop
# all watched actors at the same time.
resend-limit = 200
# WARNING: this setting should not be not changed unless all of its consequences
# are properly understood which assumes experience with remoting internals

View file

@ -101,7 +101,9 @@ final case class AckedSendBuffer[T <: HasSequenceNumber](
def acknowledge(ack: Ack): AckedSendBuffer[T] = {
if (ack.cumulativeAck > maxSeq)
throw new IllegalArgumentException(s"Highest SEQ so far was $maxSeq but cumulative ACK is ${ack.cumulativeAck}")
val newNacked = (nacked ++ nonAcked) filter { m ack.nacks(m.seq) }
val newNacked =
if (ack.nacks.isEmpty) Vector.empty
else (nacked ++ nonAcked) filter { m ack.nacks(m.seq) }
if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException
else this.copy(
nonAcked = nonAcked.filter { m m.seq > ack.cumulativeAck },
@ -181,7 +183,8 @@ final case class AckedReceiveBuffer[T <: HasSequenceNumber](
prev = bufferedMsg.seq
}
(this.copy(buf = buf filterNot deliver.contains, lastDelivered = updatedLastDelivered), deliver, ack)
val newBuf = if (deliver.isEmpty) buf else buf.filterNot(deliver.contains)
(this.copy(buf = newBuf, lastDelivered = updatedLastDelivered), deliver, ack)
}
/**
@ -199,4 +202,4 @@ final case class AckedReceiveBuffer[T <: HasSequenceNumber](
}
override def toString = buf.map { _.seq }.mkString("[", ", ", "]")
}
}

View file

@ -200,13 +200,6 @@ private[remote] class ReliableDeliverySupervisor(
val autoResendTimer = context.system.scheduler.schedule(
settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
// Aim for a maximum of 100 resend/s = 0.1 resend/ms
val maxResendRate = 100.0
val resendLimit =
math.min(
1000,
math.max((settings.SysResendTimeout.toMillis * (maxResendRate / 1000.0)).toInt, 1))
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case e @ (_: AssociationProblem) Escalate
case NonFatal(e)
@ -372,8 +365,9 @@ private[remote] class ReliableDeliverySupervisor(
val sequencedSend = send.copy(seqOpt = Some(nextSeq()))
tryBuffer(sequencedSend)
// If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it.
// GotUid will kick resendAll() causing the messages to be properly written
if (uidConfirmed)
// GotUid will kick resendAll() causing the messages to be properly written.
// Flow control by not sending more when we already have many outstanding.
if (uidConfirmed && resendBuffer.nonAcked.size <= settings.SysResendLimit)
writer ! sequencedSend
} else writer ! send
@ -381,7 +375,7 @@ private[remote] class ReliableDeliverySupervisor(
private def resendAll(): Unit = {
resendNacked()
resendBuffer.nonAcked.take(resendLimit) foreach { writer ! _ }
resendBuffer.nonAcked.take(settings.SysResendLimit) foreach { writer ! _ }
}
private def tryBuffer(s: Send): Unit =
@ -716,7 +710,7 @@ private[remote] class EndpointWriter(
val writing: Receive = {
case s: Send
if (!writeSend(s)) {
if (s.seqOpt.isEmpty) enqueueInBuffer(s)
enqueueInBuffer(s)
scheduleBackoffTimer()
context.become(buffering)
}

View file

@ -80,6 +80,10 @@ final class RemoteSettings(val config: Config) {
config.getMillisDuration("akka.remote.resend-interval")
} requiring (_ > Duration.Zero, "resend-interval must be > 0")
val SysResendLimit: Int = {
config.getInt("akka.remote.resend-limit")
} requiring (_ > 0, "resend-limit must be > 0")
val SysMsgBufferSize: Int = {
getInt("akka.remote.system-message-buffer-size")
} requiring (_ > 0, "system-message-buffer-size must be > 0")

View file

@ -39,6 +39,7 @@ class RemoteConfigSpec extends AkkaSpec(
LogBufferSizeExceeding should ===(50000)
SysMsgAckTimeout should ===(0.3 seconds)
SysResendTimeout should ===(2 seconds)
SysResendLimit should ===(200)
SysMsgBufferSize should ===(20000)
InitialSysMsgDeliveryTimeout should ===(3 minutes)
QuarantineDuration should ===(5 days)