diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 76a88f613f..063188431d 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -267,6 +267,14 @@ akka { # Messages that were negatively acknowledged are always immediately # resent. resend-interval = 2 s + + # Maximum number of unacknowledged system messages that will be resent + # each 'resend-interval'. If you watch many (> 1000) remote actors you can + # increase this value to for example 600, but a too large limit (e.g. 10000) + # may flood the connection and might cause false failure detection to trigger. + # Test such a configuration by watching all actors at the same time and stop + # all watched actors at the same time. + resend-limit = 200 # WARNING: this setting should not be not changed unless all of its consequences # are properly understood which assumes experience with remoting internals diff --git a/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala b/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala index 677e1a11e3..7a0de8f2af 100644 --- a/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala @@ -101,7 +101,9 @@ final case class AckedSendBuffer[T <: HasSequenceNumber]( def acknowledge(ack: Ack): AckedSendBuffer[T] = { if (ack.cumulativeAck > maxSeq) throw new IllegalArgumentException(s"Highest SEQ so far was $maxSeq but cumulative ACK is ${ack.cumulativeAck}") - val newNacked = (nacked ++ nonAcked) filter { m ⇒ ack.nacks(m.seq) } + val newNacked = + if (ack.nacks.isEmpty) Vector.empty + else (nacked ++ nonAcked) filter { m ⇒ ack.nacks(m.seq) } if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException else this.copy( nonAcked = nonAcked.filter { m ⇒ m.seq > ack.cumulativeAck }, @@ -181,7 +183,8 @@ final case class AckedReceiveBuffer[T <: HasSequenceNumber]( prev = bufferedMsg.seq } - (this.copy(buf = buf filterNot deliver.contains, lastDelivered = updatedLastDelivered), deliver, ack) + val newBuf = if (deliver.isEmpty) buf else buf.filterNot(deliver.contains) + (this.copy(buf = newBuf, lastDelivered = updatedLastDelivered), deliver, ack) } /** @@ -199,4 +202,4 @@ final case class AckedReceiveBuffer[T <: HasSequenceNumber]( } override def toString = buf.map { _.seq }.mkString("[", ", ", "]") -} \ No newline at end of file +} diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index ae6ecda855..a99cada916 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -200,13 +200,6 @@ private[remote] class ReliableDeliverySupervisor( val autoResendTimer = context.system.scheduler.schedule( settings.SysResendTimeout, settings.SysResendTimeout, self, AttemptSysMsgRedelivery) - // Aim for a maximum of 100 resend/s = 0.1 resend/ms - val maxResendRate = 100.0 - val resendLimit = - math.min( - 1000, - math.max((settings.SysResendTimeout.toMillis * (maxResendRate / 1000.0)).toInt, 1)) - override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case e @ (_: AssociationProblem) ⇒ Escalate case NonFatal(e) ⇒ @@ -372,8 +365,9 @@ private[remote] class ReliableDeliverySupervisor( val sequencedSend = send.copy(seqOpt = Some(nextSeq())) tryBuffer(sequencedSend) // If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it. - // GotUid will kick resendAll() causing the messages to be properly written - if (uidConfirmed) + // GotUid will kick resendAll() causing the messages to be properly written. + // Flow control by not sending more when we already have many outstanding. + if (uidConfirmed && resendBuffer.nonAcked.size <= settings.SysResendLimit) writer ! sequencedSend } else writer ! send @@ -381,7 +375,7 @@ private[remote] class ReliableDeliverySupervisor( private def resendAll(): Unit = { resendNacked() - resendBuffer.nonAcked.take(resendLimit) foreach { writer ! _ } + resendBuffer.nonAcked.take(settings.SysResendLimit) foreach { writer ! _ } } private def tryBuffer(s: Send): Unit = @@ -716,7 +710,7 @@ private[remote] class EndpointWriter( val writing: Receive = { case s: Send ⇒ if (!writeSend(s)) { - if (s.seqOpt.isEmpty) enqueueInBuffer(s) + enqueueInBuffer(s) scheduleBackoffTimer() context.become(buffering) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 11610f62de..4d655e2abe 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -80,6 +80,10 @@ final class RemoteSettings(val config: Config) { config.getMillisDuration("akka.remote.resend-interval") } requiring (_ > Duration.Zero, "resend-interval must be > 0") + val SysResendLimit: Int = { + config.getInt("akka.remote.resend-limit") + } requiring (_ > 0, "resend-limit must be > 0") + val SysMsgBufferSize: Int = { getInt("akka.remote.system-message-buffer-size") } requiring (_ > 0, "system-message-buffer-size must be > 0") diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index c0a7538d0b..eb2b644d74 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -39,6 +39,7 @@ class RemoteConfigSpec extends AkkaSpec( LogBufferSizeExceeding should ===(50000) SysMsgAckTimeout should ===(0.3 seconds) SysResendTimeout should ===(2 seconds) + SysResendLimit should ===(200) SysMsgBufferSize should ===(20000) InitialSysMsgDeliveryTimeout should ===(3 minutes) QuarantineDuration should ===(5 days) diff --git a/project/MiMa.scala b/project/MiMa.scala index 0d48c14227..2907bd13d4 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -523,6 +523,9 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingMethodProblem]("akka.dispatch.MessageDispatcher.akka$dispatch$BatchingExecutor$$_blockContext"), // issue #16736 ProblemFilters.exclude[MissingClassProblem]("akka.cluster.OnMemberUpListener"), + // issue #17554 + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.maxResendRate"), + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.resendLimit"), //changes introduced by #16911 ProblemFilters.exclude[MissingMethodProblem]("akka.remote.RemoteActorRefProvider.afterSendSystemMessage"),