From 5e0bc91e3462287ce1cd1f912f05628518a56808 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Tue, 5 Feb 2019 16:29:30 +0000 Subject: [PATCH] Make cluster logging even more standard A few cases the dc logging was inconsistent. The logger has changed in some cases but only in cluster actors of which there are a single instance --- .../scala/akka/cluster/ClusterDaemon.scala | 4 ++-- .../scala/akka/cluster/ClusterHeartbeat.scala | 18 +++++++++--------- .../akka/cluster/CrossDcClusterHeartbeat.scala | 14 ++++++++------ 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index e9a3780220..ec1fa3b133 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -300,9 +300,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh import MembershipState._ val cluster = Cluster(context.system) + import cluster.ClusterLogger._ import cluster.{ selfAddress, selfRoles, scheduler, failureDetector, crossDcFailureDetector } import cluster.settings._ - import cluster.ClusterLogger._ val selfDc = cluster.selfDataCenter @@ -981,7 +981,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh } } - logDebug("Receiving gossip from [{}]", selfAddress, from) + logDebug("Receiving gossip from [{}]", from) if (comparison == VectorClock.Concurrent && cluster.settings.Debug.VerboseGossipLogging) { logDebug( diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index a282baf997..9703439a63 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -33,7 +33,7 @@ private[cluster] final class ClusterHeartbeatReceiver extends Actor with ActorLo def receive = { case Heartbeat(from) ⇒ - if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat from [{}]", cluster.selfAddress, from) + if (verboseHeartbeat) cluster.ClusterLogger.logDebug("Heartbeat from [{}]", from) sender() ! selfHeartbeatRsp } @@ -79,6 +79,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg import ClusterHeartbeatSender._ val cluster = Cluster(context.system) + import cluster.ClusterLogger._ val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging import cluster.{ selfAddress, selfUniqueAddress, scheduler } import cluster.settings._ @@ -174,9 +175,9 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg def heartbeat(): Unit = { state.activeReceivers foreach { to ⇒ if (failureDetector.isMonitoring(to.address)) { - if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat to [{}]", selfAddress, to.address) + if (verboseHeartbeat) logDebug("Heartbeat to [{}]", to.address) } else { - if (verboseHeartbeat) log.debug("Cluster Node [{}] - First Heartbeat to [{}]", selfAddress, to.address) + if (verboseHeartbeat) logDebug("First Heartbeat to [{}]", to.address) // schedule the expected first heartbeat for later, which will give the // other side a chance to reply, and also trigger some resends if needed scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ExpectedFirstHeartbeat(to)) @@ -190,24 +191,23 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg private def checkTickInterval(): Unit = { val now = System.nanoTime() if ((now - tickTimestamp) >= (HeartbeatInterval.toNanos * 2)) - log.warning( - "Cluster Node [{}] - Scheduled sending of heartbeat was delayed. " + + logWarning( + "Scheduled sending of heartbeat was delayed. " + "Previous heartbeat was sent [{}] ms ago, expected interval is [{}] ms. This may cause failure detection " + "to mark members as unreachable. The reason can be thread starvation, e.g. by running blocking tasks on the " + - "default dispatcher, CPU overload, or GC.", - selfAddress, TimeUnit.NANOSECONDS.toMillis(now - tickTimestamp), HeartbeatInterval.toMillis) + "default dispatcher, CPU overload, or GC.", TimeUnit.NANOSECONDS.toMillis(now - tickTimestamp), HeartbeatInterval.toMillis) tickTimestamp = now } def heartbeatRsp(from: UniqueAddress): Unit = { - if (verboseHeartbeat) log.debug("Cluster Node [{}] - Heartbeat response from [{}]", selfAddress, from.address) + if (verboseHeartbeat) logDebug("Heartbeat response from [{}]", from.address) state = state.heartbeatRsp(from) } def triggerFirstHeartbeat(from: UniqueAddress): Unit = if (state.activeReceivers(from) && !failureDetector.isMonitoring(from.address)) { - if (verboseHeartbeat) log.debug("Cluster Node [{}] - Trigger extra expected heartbeat from [{}]", selfAddress, from.address) + if (verboseHeartbeat) logDebug("Trigger extra expected heartbeat from [{}]", from.address) failureDetector.heartbeat(from.address) } diff --git a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala index 5fc630139c..a6f8da8b53 100644 --- a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala @@ -30,13 +30,15 @@ import scala.collection.immutable * This monitoring mode is both simple and predictable, and also uses the assumption that * "nodes which stay around for a long time, become old", and those rarely change. In a way, * they are the "core" of a cluster, while other nodes may be very dynamically changing worked - * nodes which aggresively come and go as the traffic in the service changes. + * nodes which aggressively come and go as the traffic in the service changes. */ @InternalApi private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogging { import CrossDcHeartbeatSender._ val cluster = Cluster(context.system) + import cluster.ClusterLogger._ + val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging import cluster.settings._ import cluster.{ scheduler, selfAddress, selfDataCenter, selfUniqueAddress } @@ -150,9 +152,9 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg def heartbeat(): Unit = { dataCentersState.activeReceivers foreach { to ⇒ if (crossDcFailureDetector.isMonitoring(to.address)) { - if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - (Cross) Heartbeat to [{}]", selfDataCenter, selfAddress, to.address) + if (verboseHeartbeat) logDebug("(Cross) Heartbeat to [{}]", to.address) } else { - if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - First (Cross) Heartbeat to [{}]", selfDataCenter, selfAddress, to.address) + if (verboseHeartbeat) logDebug("First (Cross) Heartbeat to [{}]", to.address) // schedule the expected first heartbeat for later, which will give the // other side a chance to reply, and also trigger some resends if needed scheduler.scheduleOnce(HeartbeatExpectedResponseAfter, self, ClusterHeartbeatSender.ExpectedFirstHeartbeat(to)) @@ -162,13 +164,13 @@ private[cluster] final class CrossDcHeartbeatSender extends Actor with ActorLogg } def heartbeatRsp(from: UniqueAddress): Unit = { - if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - (Cross) Heartbeat response from [{}]", selfDataCenter, selfAddress, from.address) + if (verboseHeartbeat) logDebug("(Cross) Heartbeat response from [{}]", from.address) dataCentersState = dataCentersState.heartbeatRsp(from) } def triggerFirstHeartbeat(from: UniqueAddress): Unit = if (dataCentersState.activeReceivers.contains(from) && !crossDcFailureDetector.isMonitoring(from.address)) { - if (verboseHeartbeat) log.debug("Cluster Node [{}][{}] - Trigger extra expected (cross) heartbeat from [{}]", selfAddress, from.address) + if (verboseHeartbeat) logDebug("Trigger extra expected (cross) heartbeat from [{}]", from.address) crossDcFailureDetector.heartbeat(from.address) } @@ -276,7 +278,7 @@ private[cluster] final case class CrossDcHeartbeatingState( .map(_.uniqueAddress).to(immutable.IndexedSeq)).toSet } - /** Lists addresses in diven DataCenter that this node should send heartbeats to */ + /** Lists addresses in given DataCenter that this node should send heartbeats to */ private def activeReceiversIn(dc: DataCenter): Set[UniqueAddress] = if (dc == selfDataCenter) Set.empty // CrossDcHeartbeatSender is not supposed to send within its own Dc else {