=cls #16090 Add more logging for buffering and unavailable coordinator

This commit is contained in:
Patrik Nordwall 2015-08-13 10:46:40 +02:00
parent 1d5f3726c9
commit c9f0021297

View file

@ -255,6 +255,7 @@ class ShardRegion(
var regions = Map.empty[ActorRef, Set[ShardId]]
var regionByShard = Map.empty[ShardId, ActorRef]
var shardBuffers = Map.empty[ShardId, Vector[(Msg, ActorRef)]]
var loggedFullBufferWarning = false
var shards = Map.empty[ShardId, ActorRef]
var shardsByRef = Map.empty[ActorRef, ShardId]
var handingOff = Set.empty[ActorRef]
@ -264,6 +265,7 @@ class ShardRegion(
import context.dispatcher
val retryTask = context.system.scheduler.schedule(retryInterval, retryInterval, self, Retry)
var retryCount = 0
// subscribe to MemberEvent, re-subscribe when restart
override def preStart(): Unit = {
@ -376,8 +378,10 @@ class ShardRegion(
// must drop requests that came in between the BeginHandOff and now,
// because they might be forwarded from other regions and there
// is a risk or message re-ordering otherwise
if (shardBuffers.contains(shard))
if (shardBuffers.contains(shard)) {
shardBuffers -= shard
loggedFullBufferWarning = false
}
if (shards.contains(shard)) {
handingOff += shards(shard)
@ -391,6 +395,8 @@ class ShardRegion(
def receiveCommand(cmd: ShardRegionCommand): Unit = cmd match {
case Retry
if (shardBuffers.nonEmpty)
retryCount += 1
if (coordinator.isEmpty)
register()
else {
@ -444,6 +450,9 @@ class ShardRegion(
def register(): Unit = {
coordinatorSelection.foreach(_ ! registrationMessage)
if (shardBuffers.nonEmpty && retryCount >= 5)
log.warning("Trying to register to coordinator at [{}], but no acknoledgment. Total [{}] buffered messages.",
coordinatorSelection, totalBufferSize)
}
def registrationMessage: Any =
@ -451,8 +460,13 @@ class ShardRegion(
def requestShardBufferHomes(): Unit = {
shardBuffers.foreach {
case (shard, _) coordinator.foreach { c
log.debug("Retry request for shard [{}] homes", shard)
case (shard, buf) coordinator.foreach { c
val logMsg = "Retry request for shard [{}] homes from coordinator at [{}]. [{}] buffered messages."
if (retryCount >= 5)
log.warning(logMsg, shard, c, buf.size)
else
log.debug(logMsg, shard, c, buf.size)
c ! GetShardHome(shard)
}
}
@ -461,10 +475,13 @@ class ShardRegion(
def deliverBufferedMessages(shard: String): Unit = {
shardBuffers.get(shard) match {
case Some(buf)
log.debug("Deliver [{}] buffered messages for shard [{}]", buf.size, shard)
buf.foreach { case (msg, snd) deliverMessage(msg, snd) }
shardBuffers -= shard
case None
}
loggedFullBufferWarning = false
retryCount = 0
}
def deliverMessage(msg: Any, snd: ActorRef): Unit =
@ -480,6 +497,7 @@ class ShardRegion(
coordinator.foreach(_ ! GetShardHome(shardId))
}
val buf = shardBuffers.getOrElse(shardId, Vector.empty)
log.debug("Buffer message for shard [{}]. Total [{}] buffered messages.", shardId, buf.size + 1)
shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd)))
}
@ -499,12 +517,28 @@ class ShardRegion(
log.debug("Request shard [{}] home", shardId)
coordinator.foreach(_ ! GetShardHome(shardId))
}
if (totalBufferSize >= bufferSize) {
log.debug("Buffer is full, dropping message for shard [{}]", shardId)
val totBufSize = totalBufferSize
if (totBufSize >= bufferSize) {
if (loggedFullBufferWarning)
log.debug("Buffer is full, dropping message for shard [{}]", shardId)
else {
log.warning("Buffer is full, dropping message for shard [{}]", shardId)
loggedFullBufferWarning = true
}
context.system.deadLetters ! msg
} else {
val buf = shardBuffers.getOrElse(shardId, Vector.empty)
shardBuffers = shardBuffers.updated(shardId, buf :+ ((msg, snd)))
// log some insight to how buffers are filled up every 10% of the buffer capacity
val tot = totBufSize + 1
if (tot % (bufferSize / 10) == 0) {
val logMsg = s"ShardRegion for [$typeName] is using [${100.0 * tot / bufferSize} %] of its buffer capacity."
if (tot <= bufferSize / 2)
log.info(logMsg)
else
log.warning(logMsg + " The coordinator might not be available. You might want to check cluster membership status.")
}
}
}
}