Merge pull request #26334 from chbatey/cluster-logging-standard
Make cluster logging even more standard
This commit is contained in:
commit
ec80fd7a56
3 changed files with 19 additions and 17 deletions
|
|
@ -300,9 +300,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
import MembershipState._
|
||||
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.ClusterLogger._
|
||||
import cluster.{ selfAddress, selfRoles, scheduler, failureDetector, crossDcFailureDetector }
|
||||
import cluster.settings._
|
||||
import cluster.ClusterLogger._
|
||||
|
||||
val selfDc = cluster.selfDataCenter
|
||||
|
||||
|
|
@ -981,7 +981,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
}
|
||||
}
|
||||
|
||||
logDebug("Receiving gossip from [{}]", selfAddress, from)
|
||||
logDebug("Receiving gossip from [{}]", from)
|
||||
|
||||
if (comparison == VectorClock.Concurrent && cluster.settings.Debug.VerboseGossipLogging) {
|
||||
logDebug(
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo
|
|||
|
||||
def receive = {
|
||||
case Heartbeat(from) ⇒
|
||||
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat from [{}]", cluster.selfAddress, from)
|
||||
if (verboseHeartbeat) cluster.ClusterLogger.logDebug("Heartbeat from [{}]", from)
|
||||
sender() ! selfHeartbeatRsp
|
||||
}
|
||||
|
||||
|
|
@ -79,6 +79,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
import ClusterHeartbeatSender._
|
||||
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.ClusterLogger._
|
||||
val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging
|
||||
import cluster.{ selfAddress, selfUniqueAddress, scheduler }
|
||||
import cluster.settings._
|
||||
|
|
@ -174,9 +175,9 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
def heartbeat(): Unit = {
|
||||
state.activeReceivers foreach { to ⇒
|
||||
if (failureDetector.isMonitoring(to.address)) {
|
||||
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, to.address)
|
||||
if (verboseHeartbeat) logDebug("Heartbeat to [{}]", to.address)
|
||||
} else {
|
||||
if (verboseHeartbeat) log.debug("Cluster Node [{}] - First Heartbeat to [{}]", selfAddress, to.address)
|
||||
if (verboseHeartbeat) logDebug("First Heartbeat to [{}]", to.address)
|
||||
// schedule the expected first heartbeat for later, which will give the
|
||||
// other side a chance to reply, and also trigger some resends if needed
|
||||
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(to))
|
||||
|
|
@ -190,24 +191,23 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
|
|||
private def checkTickInterval(): Unit = {
|
||||
val now = System.nanoTime()
|
||||
if ((now - tickTimestamp) >= (HeartbeatInterval.toNanos * 2))
|
||||
log.warning(
|
||||
"Cluster Node [{}] - Scheduled sending of heartbeat was delayed. " +
|
||||
logWarning(
|
||||
"Scheduled sending of heartbeat was delayed. " +
|
||||
"Previous heartbeat was sent [{}] ms ago, expected interval is [{}] ms. This may cause failure detection " +
|
||||
"to mark members as unreachable. The reason can be thread starvation, e.g. by running blocking tasks on the " +
|
||||
"default dispatcher, CPU overload, or GC.",
|
||||
selfAddress, TimeUnit.NANOSECONDS.toMillis(now - tickTimestamp), HeartbeatInterval.toMillis)
|
||||
"default dispatcher, CPU overload, or GC.", TimeUnit.NANOSECONDS.toMillis(now - tickTimestamp), HeartbeatInterval.toMillis)
|
||||
tickTimestamp = now
|
||||
|
||||
}
|
||||
|
||||
def heartbeatRsp(from: UniqueAddress): Unit = {
|
||||
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat response from [{}]", selfAddress, from.address)
|
||||
if (verboseHeartbeat) logDebug("Heartbeat response from [{}]", from.address)
|
||||
state = state.heartbeatRsp(from)
|
||||
}
|
||||
|
||||
def triggerFirstHeartbeat(from: UniqueAddress): Unit =
|
||||
if (state.activeReceivers(from) && !failureDetector.isMonitoring(from.address)) {
|
||||
if (verboseHeartbeat) log.debug("Cluster Node [{}] - Trigger extra expected heartbeat from [{}]", selfAddress, from.address)
|
||||
if (verboseHeartbeat) logDebug("Trigger extra expected heartbeat from [{}]", from.address)
|
||||
failureDetector.heartbeat(from.address)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,13 +30,15 @@ import scala.collection.immutable
|
|||
* This monitoring mode is both simple and predictable, and also uses the assumption that
|
||||
* "nodes which stay around for a long time, become old", and those rarely change. In a way,
|
||||
* they are the "core" of a cluster, while other nodes may be very dynamically changing worked
|
||||
* nodes which aggresively come and go as the traffic in the service changes.
|
||||
* nodes which aggressively come and go as the traffic in the service changes.
|
||||
*/
|
||||
@InternalApi
|
||||
private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogging {
|
||||
import CrossDcHeartbeatSender._
|
||||
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.ClusterLogger._
|
||||
|
||||
val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging
|
||||
import cluster.settings._
|
||||
import cluster.{ scheduler, selfAddress, selfDataCenter, selfUniqueAddress }
|
||||
|
|
@ -150,9 +152,9 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
|
|||
def heartbeat(): Unit = {
|
||||
dataCentersState.activeReceivers foreach { to ⇒
|
||||
if (crossDcFailureDetector.isMonitoring(to.address)) {
|
||||
if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - (Cross) Heartbeat to [{}]", selfDataCenter, selfAddress, to.address)
|
||||
if (verboseHeartbeat) logDebug("(Cross) Heartbeat to [{}]", to.address)
|
||||
} else {
|
||||
if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - First (Cross) Heartbeat to [{}]", selfDataCenter, selfAddress, to.address)
|
||||
if (verboseHeartbeat) logDebug("First (Cross) Heartbeat to [{}]", to.address)
|
||||
// schedule the expected first heartbeat for later, which will give the
|
||||
// other side a chance to reply, and also trigger some resends if needed
|
||||
scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ClusterHeartbeatSender.ExpectedFirstHeartbeat(to))
|
||||
|
|
@ -162,13 +164,13 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg
|
|||
}
|
||||
|
||||
def heartbeatRsp(from: UniqueAddress): Unit = {
|
||||
if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - (Cross) Heartbeat response from [{}]", selfDataCenter, selfAddress, from.address)
|
||||
if (verboseHeartbeat) logDebug("(Cross) Heartbeat response from [{}]", from.address)
|
||||
dataCentersState = dataCentersState.heartbeatRsp(from)
|
||||
}
|
||||
|
||||
def triggerFirstHeartbeat(from: UniqueAddress): Unit =
|
||||
if (dataCentersState.activeReceivers.contains(from) && !crossDcFailureDetector.isMonitoring(from.address)) {
|
||||
if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - Trigger extra expected (cross) heartbeat from [{}]", selfAddress, from.address)
|
||||
if (verboseHeartbeat) logDebug("Trigger extra expected (cross) heartbeat from [{}]", from.address)
|
||||
crossDcFailureDetector.heartbeat(from.address)
|
||||
}
|
||||
|
||||
|
|
@ -276,7 +278,7 @@ private[cluster] final case class CrossDcHeartbeatingState(
|
|||
.map(_.uniqueAddress).to(immutable.IndexedSeq)).toSet
|
||||
}
|
||||
|
||||
/** Lists addresses in diven DataCenter that this node should send heartbeats to */
|
||||
/** Lists addresses in given DataCenter that this node should send heartbeats to */
|
||||
private def activeReceiversIn(dc: DataCenter): Set[UniqueAddress] =
|
||||
if (dc == selfDataCenter) Set.empty // CrossDcHeartbeatSender is not supposed to send within its own Dc
|
||||
else {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue