Merge pull request #25154 from akka/wip-25143-neg-ack-log-patriknw

limit negative acknowledgement logging, #25143
This commit is contained in:
Patrik Nordwall 2018-05-28 11:45:26 +02:00 committed by GitHub
commit 1919f222fa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -30,6 +30,7 @@ import akka.actor.ActorRef
import akka.dispatch.sysmsg.SystemMessage
import scala.util.control.NoStackTrace
import akka.annotation.InternalApi
import akka.event.Logging
import akka.stream.stage.StageLogging
import akka.util.OptionVal
@ -37,7 +38,7 @@ import akka.util.OptionVal
/**
* INTERNAL API
*/
private[remote] object SystemMessageDelivery {
@InternalApi private[remote] object SystemMessageDelivery {
final case class SystemMessageEnvelope(message: AnyRef, seqNo: Long, ackReplyTo: UniqueAddress) extends ArteryMessage
final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply
final case class Nack(seqNo: Long, from: UniqueAddress) extends Reply
@ -69,7 +70,7 @@ private[remote] object SystemMessageDelivery {
/**
* INTERNAL API
*/
private[remote] class SystemMessageDelivery(
@InternalApi private[remote] class SystemMessageDelivery(
outboundContext: OutboundContext,
deadLetters: ActorRef,
resendInterval: FiniteDuration,
@ -315,8 +316,16 @@ private[remote] class SystemMessageDelivery(
/**
* INTERNAL API
*/
private[remote] class SystemMessageAcker(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
@InternalApi private[akka] object SystemMessageAcker {
val MaxNegativeAcknowledgementLogging = 1000
}
/**
* INTERNAL API
*/
@InternalApi private[remote] class SystemMessageAcker(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
import SystemMessageDelivery._
import SystemMessageAcker._
val in: Inlet[InboundEnvelope] = Inlet("SystemMessageAcker.in")
val out: Outlet[InboundEnvelope] = Outlet("SystemMessageAcker.out")
@ -327,6 +336,7 @@ private[remote] class SystemMessageAcker(inboundContext: InboundContext) extends
// TODO we might need have to prune old unused entries
var sequenceNumbers = Map.empty[UniqueAddress, Long]
var nackCount = 0
def localAddress = inboundContext.localAddress
@ -361,9 +371,16 @@ private[remote] class SystemMessageAcker(inboundContext: InboundContext) extends
inboundContext.sendControl(ackReplyTo.address, Ack(expectedSeqNo - 1, localAddress))
pull(in)
} else {
log.warning(
"Sending negative acknowledgement of system message [{}] from [{}], highest acknowledged [{}]",
n, fromRemoteAddressStr, expectedSeqNo - 1)
if (nackCount < MaxNegativeAcknowledgementLogging) {
nackCount += 1
val maxNackReached =
if (nackCount == MaxNegativeAcknowledgementLogging)
s". This happened [$MaxNegativeAcknowledgementLogging] times and will not be logged more."
else ""
log.warning(
"Sending negative acknowledgement of system message [{}] from [{}], highest acknowledged [{}]{}",
n, fromRemoteAddressStr, expectedSeqNo - 1, maxNackReached)
}
inboundContext.sendControl(ackReplyTo.address, Nack(expectedSeqNo - 1, localAddress))
pull(in)
}