diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index a56eaa590c..06f9094f13 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -30,6 +30,7 @@ import akka.actor.ActorRef import akka.dispatch.sysmsg.SystemMessage import scala.util.control.NoStackTrace +import akka.annotation.InternalApi import akka.event.Logging import akka.stream.stage.StageLogging import akka.util.OptionVal @@ -37,7 +38,7 @@ import akka.util.OptionVal /** * INTERNAL API */ -private[remote] object SystemMessageDelivery { +@InternalApi private[remote] object SystemMessageDelivery { final case class SystemMessageEnvelope(message: AnyRef, seqNo: Long, ackReplyTo: UniqueAddress) extends ArteryMessage final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply final case class Nack(seqNo: Long, from: UniqueAddress) extends Reply @@ -69,7 +70,7 @@ private[remote] object SystemMessageDelivery { /** * INTERNAL API */ -private[remote] class SystemMessageDelivery( +@InternalApi private[remote] class SystemMessageDelivery( outboundContext: OutboundContext, deadLetters: ActorRef, resendInterval: FiniteDuration, @@ -315,8 +316,16 @@ private[remote] class SystemMessageDelivery( /** * INTERNAL API */ -private[remote] class SystemMessageAcker(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] { +@InternalApi private[akka] object SystemMessageAcker { + val MaxNegativeAcknowledgementLogging = 1000 +} + +/** + * INTERNAL API + */ +@InternalApi private[remote] class SystemMessageAcker(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] { import SystemMessageDelivery._ + import SystemMessageAcker._ val in: Inlet[InboundEnvelope] = Inlet("SystemMessageAcker.in") val out: Outlet[InboundEnvelope] = Outlet("SystemMessageAcker.out") @@ -327,6 +336,7 @@ private[remote] class SystemMessageAcker(inboundContext: InboundContext) extends // TODO we might need have to prune old unused entries var sequenceNumbers = Map.empty[UniqueAddress, Long] + var nackCount = 0 def localAddress = inboundContext.localAddress @@ -361,9 +371,16 @@ private[remote] class SystemMessageAcker(inboundContext: InboundContext) extends inboundContext.sendControl(ackReplyTo.address, Ack(expectedSeqNo - 1, localAddress)) pull(in) } else { - log.warning( - "Sending negative acknowledgement of system message [{}] from [{}], highest acknowledged [{}]", - n, fromRemoteAddressStr, expectedSeqNo - 1) + if (nackCount < MaxNegativeAcknowledgementLogging) { + nackCount += 1 + val maxNackReached = + if (nackCount == MaxNegativeAcknowledgementLogging) + s". This happened [$MaxNegativeAcknowledgementLogging] times and will not be logged more." + else "" + log.warning( + "Sending negative acknowledgement of system message [{}] from [{}], highest acknowledged [{}]{}", + n, fromRemoteAddressStr, expectedSeqNo - 1, maxNackReached) + } inboundContext.sendControl(ackReplyTo.address, Nack(expectedSeqNo - 1, localAddress)) pull(in) }